Introducing 

Prezi AI.

Your new presentation assistant.

Refine, enhance, and tailor your content, source relevant images, and edit visuals quicker than ever before.

Loading…
Transcript

PageRank will be our example!

Phases of a MapReduce job

This happens N times...

In 2010 Google introduces Pregel

  • Based on Valiant's BSP
  • Batch-oriented processing
  • Vertex-centric High-level programming model
  • Computation happens in-memory
  • Runs on own infrastructure
  • Master/slave architecture

Processes are 2-states automatas

Computation stops when all processes are halted

You want an example? Find the maximum value!

It comes with a simple Vertex-centric API

class Vertex<I, V, E, M>

{

abstract void compute(Iterable<M> messages);

long superstep();

I getVertexID();

V getVertexValue();

void setVertexValue(V value);

Iterable<I> getOutNeighbors();

void sendMessageTo(I dest, M msg);

void voteToHalt();

}

  • You define VertexInputFormat
  • You define VertexOutputFormat
  • You define a Combiner
  • You define an Aggregator
  • You have a Graph Mutator API

MaxValue pseudo-code

class MaxValueVertex extends Vertex<I, V, E, M>

{

void compute(Iterable<M> messages) {

V maxValue = getVertexValue();

for (M message: messages) {

M msgValue = message.getValue();

if (msgValue.compareTo(maxValue) > 0) {

maxValue = msgValue;

}

}

if (maxValue.compareTo(getVertexValue()) > 0) {

setVertexValue(maxValue);

for (I endpoint: getOutNeighbors() {

sendMessageTo(endpoint, getVertexValue());

}

}

voteToHalt();

}

}

Master-slave architecture

Anatomy of a Superstep

Cool! Where do I download it?

Questions?

http://incubator.apache.org/giraph

http://issues.apache.org/jira/browse/GIRAPH

claudio@apache.org

@claudiomartella

Current and Future Work

S5

Failure!

S4

  • Out-of-core messages
  • First official 0.1 release
  • Algorithms library
  • More integration: HCatalog? HBase?
  • More contributors!

At user-defined intervals

S3

Fault-tolerance through checkpointing

S2

Who am I?

S1

Apache Giraph:

Distributed Graph Processing

in the Cloud

Giraph is a loose implementation of Pregel is a distributed graph processing framework complementary to MapReduce is not well suited for graph algorithms.

Apache Girapher: PPMC Member & Committer

PhD Student: LSDS Group, Vrije University of Amsterdam

Interested in Complex Networks & Distributed and Scalable Infrastructures

Email: claudio@apache.org

Blog: http://blog.acaro.org

Twitter: @claudiomartella

Github: https://github.com/claudiomartella (dbpedia4neo etc.)

FOSDEM 2012, Graph Processing Room

5 February 2012

Claudio Martella

Clear?

Let's see the problem...

Worker

Master

Responsible for Coordination:

  • Assigns Partitions to Workers
  • Coordinates synchronization
  • Requests checkpoints
  • Aggregates Aggregators values
  • Collects health statuses
  • etc.

TaskTracker

Worker

A new job is started...

Drawbacks:

  • Job bootstrap
  • Disk is hit ~6 times
  • Data is sorted
  • Graph is passed through

Data is sent

to Reducers

(and hits the disk...)

Worker

TaskTracker

JobTracker &

NameNode

Data is scanned from local disk:

Key-Value: < nodeID, Node >

Node: < PageRank, [nodeID_i, nodeID_k, ..., nodeID_n] >

Worker

Mapper emits intermediate values

Reducer emits to (remote) disk...

  • MyPageRank / #Neighbors
  • Key-Value graph structure

Data is sorted

to disk...

  • Sums the PRs from neighbors
  • Joins it with graph structure

Worker

TaskTracker

Zookeeper

Responsible for Computation State:

  • Partition - Worker mapping
  • Global state: #superstep...
  • Checkpoints paths
  • Aggregator Values
  • Statistics
  • etc.

Worker

void map()

{

superstep = 0;

vertices = loadPartitionFromHDFS(path);

do {

for (Vertex v: vertices.getActive()) {

v.compute();

}

sendMessagesToWorkers();

checkpointIfNecessary(superstep);

synchronize();

superstep++;

} while (superstep < MAX_SUPERSTEP && !allHalted(vertices));

dumpResultsToHDFS(vertices);

}

It runs on Hadoop:

  • in-memory
  • laverages 100% existing clusters
  • it's a single Map-only job
  • can be a job in a pipeline: M/R, Hive?
  • fault-tolerant
  • Zookeeper for state, No SPOF
  • Incubated since summer 2011
  • Written in Java
  • Implements Pregel's API (loosely...)
  • Runs on existing MapReduce infrastructure
  • Active community from Yahoo!, Facebook, LinkedIn, Twitter ...

Apache Giraph

  • There's Giraph
  • There's Hama
  • There's GoldenOrb
  • There's Signal/Collect

It's proprietary... but!

Worker

Master

Worker

Vertices are assigned to Workers

  • by hash partitioning
  • by range partitioning

Master assigns and coordinates,

while Workers execute vertices and communicate with each other directly

Here are the answers!

  • Load the graph from disk
  • Assign vertices to Workers
  • Validate Workers health

This happens N times

  • Send messages to Workers
  • Compute Aggregators
  • Checkpoint

persistence

  • It's a stateful computation
  • Disk is hit if/only for checkpoints
  • No sorting is necessary
  • Only messages hit the network

messaging

  • Assign messages to vertices
  • Iterate on active vertices
  • Call vertices compute()

computing

Learn more about creating dynamic, engaging presentations with Prezi