Loading presentation...

Present Remotely

Send the link below via email or IM


Present to your audience

Start remote presentation

  • Invited audience members will follow you as you navigate and present
  • People invited to a presentation do not need a Prezi account
  • This link expires 10 minutes after you close the presentation
  • A maximum of 30 users can follow your presentation
  • Learn more about this feature in our knowledge base article

Do you really want to delete this prezi?

Neither you, nor the coeditors you shared it with will be able to recover it again.


Apache Giraph: Distributed Graph Processing in the Cloud

GraphDevroom@FOSDEM presentation for Apache Giraph

Claudio Martella

on 4 February 2012

Comments (0)

Please log in to add your comment.

Report abuse

Transcript of Apache Giraph: Distributed Graph Processing in the Cloud

Apache Giraph:
Distributed Graph Processing
in the Cloud FOSDEM 2012, Graph Processing Room
5 February 2012

Claudio Martella Apache Girapher: PPMC Member & Committer
PhD Student: LSDS Group, Vrije University of Amsterdam

Interested in Complex Networks & Distributed and Scalable Infrastructures

Email: claudio@apache.org
Blog: http://blog.acaro.org
Twitter: @claudiomartella
Github: https://github.com/claudiomartella (dbpedia4neo etc.) Who am I? Giraph is a loose implementation of Pregel is a distributed graph processing framework complementary to MapReduce is not well suited for graph algorithms. PageRank will be our example! Let's see the problem... Clear? Data is sorted
to disk... A new job is started... Data is scanned from local disk: Mapper emits intermediate values This happens N times... Key-Value: < nodeID, Node >

Node: < PageRank, [nodeID_i, nodeID_k, ..., nodeID_n] > Phases of a MapReduce job Drawbacks:
Job bootstrap
Disk is hit ~6 times
Data is sorted
Graph is passed through MyPageRank / #Neighbors
Key-Value graph structure Sums the PRs from neighbors
Joins it with graph structure In 2010 Google introduces Pregel Based on Valiant's BSP
Batch-oriented processing
Vertex-centric High-level programming model
Computation happens in-memory
Runs on own infrastructure
Master/slave architecture You want an example? Find the maximum value! Processes are 2-states automatas Computation stops when all processes are halted Cool! Where do I download it? It's proprietary... but! There's Giraph
There's Hama
There's GoldenOrb
There's Signal/Collect Incubated since summer 2011
Written in Java
Implements Pregel's API (loosely...)
Runs on existing MapReduce infrastructure
Active community from Yahoo!, Facebook, LinkedIn, Twitter ... Apache Giraph It comes with a simple Vertex-centric API class MaxValueVertex extends Vertex<I, V, E, M>
void compute(Iterable<M> messages) {
V maxValue = getVertexValue();
for (M message: messages) {
M msgValue = message.getValue();
if (msgValue.compareTo(maxValue) > 0) {
maxValue = msgValue;
if (maxValue.compareTo(getVertexValue()) > 0) {
for (I endpoint: getOutNeighbors() {
sendMessageTo(endpoint, getVertexValue());
} MaxValue pseudo-code class Vertex<I, V, E, M>
abstract void compute(Iterable<M> messages);
long superstep();
I getVertexID();
V getVertexValue();
void setVertexValue(V value);
Iterable<I> getOutNeighbors();
void sendMessageTo(I dest, M msg);
void voteToHalt();
} You define VertexInputFormat
You define VertexOutputFormat
You define a Combiner
You define an Aggregator
You have a Graph Mutator API messaging Anatomy of a Superstep This happens N times persistence computing Load the graph from disk
Assign vertices to Workers
Validate Workers health Assign messages to vertices
Iterate on active vertices
Call vertices compute() Send messages to Workers
Compute Aggregators
Checkpoint Zookeeper JobTracker &
NameNode TaskTracker TaskTracker TaskTracker TaskTracker Master Worker Worker Worker Worker Worker Worker Worker void map()
superstep = 0;
vertices = loadPartitionFromHDFS(path);
do {
for (Vertex v: vertices.getActive()) {
} while (superstep < MAX_SUPERSTEP && !allHalted(vertices));
} Responsible for Computation State:
Partition - Worker mapping
Global state: #superstep...
Checkpoints paths
Aggregator Values
etc. Responsible for Coordination:
Assigns Partitions to Workers
Coordinates synchronization
Requests checkpoints
Aggregates Aggregators values
Collects health statuses
etc. It runs on Hadoop:
laverages 100% existing clusters
it's a single Map-only job
can be a job in a pipeline: M/R, Hive?
Zookeeper for state, No SPOF S1 S2 S3 S2 S3 S4 S5 Fault-tolerance through checkpointing Failure! S4 At user-defined intervals Out-of-core messages
First official 0.1 release
Algorithms library
More integration: HCatalog? HBase?
More contributors! Current and Future Work Questions? http://incubator.apache.org/giraph
@claudiomartella Data is sent
to Reducers
(and hits the disk...) Reducer emits to (remote) disk... Vertices are assigned to Workers
by hash partitioning
by range partitioning Worker Worker Worker Master Master assigns and coordinates,
while Workers execute vertices and communicate with each other directly Master-slave architecture Here are the answers! It's a stateful computation
Disk is hit if/only for checkpoints
No sorting is necessary
Only messages hit the network
Full transcript