PageRank will be our example!
Phases of a MapReduce job
This happens N times...
In 2010 Google introduces Pregel
- Based on Valiant's BSP
- Batch-oriented processing
- Vertex-centric High-level programming model
- Computation happens in-memory
- Runs on own infrastructure
- Master/slave architecture
Processes are 2-states automatas
Computation stops when all processes are halted
You want an example? Find the maximum value!
It comes with a simple Vertex-centric API
class Vertex<I, V, E, M>
{
abstract void compute(Iterable<M> messages);
long superstep();
I getVertexID();
V getVertexValue();
void setVertexValue(V value);
Iterable<I> getOutNeighbors();
void sendMessageTo(I dest, M msg);
void voteToHalt();
}
- You define VertexInputFormat
- You define VertexOutputFormat
- You define a Combiner
- You define an Aggregator
- You have a Graph Mutator API
MaxValue pseudo-code
class MaxValueVertex extends Vertex<I, V, E, M>
{
void compute(Iterable<M> messages) {
V maxValue = getVertexValue();
for (M message: messages) {
M msgValue = message.getValue();
if (msgValue.compareTo(maxValue) > 0) {
maxValue = msgValue;
}
}
if (maxValue.compareTo(getVertexValue()) > 0) {
setVertexValue(maxValue);
for (I endpoint: getOutNeighbors() {
sendMessageTo(endpoint, getVertexValue());
}
}
voteToHalt();
}
}
Master-slave architecture
Anatomy of a Superstep
Cool! Where do I download it?
Questions?
http://incubator.apache.org/giraph
http://issues.apache.org/jira/browse/GIRAPH
claudio@apache.org
@claudiomartella
Current and Future Work
Failure!
- Out-of-core messages
- First official 0.1 release
- Algorithms library
- More integration: HCatalog? HBase?
- More contributors!
At user-defined intervals
Fault-tolerance through checkpointing
Who am I?
Apache Giraph:
Distributed Graph Processing
in the Cloud
Giraph is a loose implementation of Pregel is a distributed graph processing framework complementary to MapReduce is not well suited for graph algorithms.
Apache Girapher: PPMC Member & Committer
PhD Student: LSDS Group, Vrije University of Amsterdam
Interested in Complex Networks & Distributed and Scalable Infrastructures
Email: claudio@apache.org
Blog: http://blog.acaro.org
Twitter: @claudiomartella
Github: https://github.com/claudiomartella (dbpedia4neo etc.)
FOSDEM 2012, Graph Processing Room
5 February 2012
Claudio Martella
Clear?
Let's see the problem...
Worker
Master
Responsible for Coordination:
- Assigns Partitions to Workers
- Coordinates synchronization
- Requests checkpoints
- Aggregates Aggregators values
- Collects health statuses
- etc.
TaskTracker
Worker
A new job is started...
Drawbacks:
- Job bootstrap
- Disk is hit ~6 times
- Data is sorted
- Graph is passed through
Data is sent
to Reducers
(and hits the disk...)
Worker
TaskTracker
JobTracker &
NameNode
Data is scanned from local disk:
Key-Value: < nodeID, Node >
Node: < PageRank, [nodeID_i, nodeID_k, ..., nodeID_n] >
Worker
Mapper emits intermediate values
Reducer emits to (remote) disk...
- MyPageRank / #Neighbors
- Key-Value graph structure
Data is sorted
to disk...
- Sums the PRs from neighbors
- Joins it with graph structure
Worker
TaskTracker
Zookeeper
Responsible for Computation State:
- Partition - Worker mapping
- Global state: #superstep...
- Checkpoints paths
- Aggregator Values
- Statistics
- etc.
Worker
void map()
{
superstep = 0;
vertices = loadPartitionFromHDFS(path);
do {
for (Vertex v: vertices.getActive()) {
v.compute();
}
sendMessagesToWorkers();
checkpointIfNecessary(superstep);
synchronize();
superstep++;
} while (superstep < MAX_SUPERSTEP && !allHalted(vertices));
dumpResultsToHDFS(vertices);
}
It runs on Hadoop:
- in-memory
- laverages 100% existing clusters
- it's a single Map-only job
- can be a job in a pipeline: M/R, Hive?
- fault-tolerant
- Zookeeper for state, No SPOF
- Incubated since summer 2011
- Written in Java
- Implements Pregel's API (loosely...)
- Runs on existing MapReduce infrastructure
- Active community from Yahoo!, Facebook, LinkedIn, Twitter ...
Apache Giraph
- There's Giraph
- There's Hama
- There's GoldenOrb
- There's Signal/Collect
It's proprietary... but!
Vertices are assigned to Workers
- by hash partitioning
- by range partitioning
Master assigns and coordinates,
while Workers execute vertices and communicate with each other directly
Here are the answers!
- Load the graph from disk
- Assign vertices to Workers
- Validate Workers health
This happens N times
- Send messages to Workers
- Compute Aggregators
- Checkpoint
persistence
- It's a stateful computation
- Disk is hit if/only for checkpoints
- No sorting is necessary
- Only messages hit the network
messaging
- Assign messages to vertices
- Iterate on active vertices
- Call vertices compute()
computing