Loading presentation...

Present Remotely

Send the link below via email or IM


Present to your audience

Start remote presentation

  • Invited audience members will follow you as you navigate and present
  • People invited to a presentation do not need a Prezi account
  • This link expires 10 minutes after you close the presentation
  • A maximum of 30 users can follow your presentation
  • Learn more about this feature in our knowledge base article

Do you really want to delete this prezi?

Neither you, nor the coeditors you shared it with will be able to recover it again.


Hadoop Programming

An very quick overview of Hadoop programming using Pig, Hive, and Streaming.

Ray Toal

on 9 January 2013

Comments (0)

Please log in to add your comment.

Report abuse

Transcript of Hadoop Programming

Objectives This overview will get you to:

Be able to use the hadoop command and perform the most common HDFS operations
Understand, pretty well, something called Map Reduce
Be able to write applications using Hive, Pig, and Hadoop Streaming This is a programming course, covering Hive, Pig, and Streaming but not the low-level Java Hadoop API nor non-Apache frameworks such as Cascading.

It does not cover Hadoop history or administration, cloud computing, grid computing, EC2, or S3. What This Course
Is About An open-source project for scalable, reliable, distributed computing consisting of (1) The Hadoop Common, (2) the HDFS, (3) a scheduling and cluster management framework, and (4) a Map-Reduce system.

Used for batch processing jobs, not for real-time queries. Hadoop Is... Why We Care Big Data can take weeks to aggregate, ingest, process, or analyze on a traditional RDBMS.

Early attempts at clustering were insufficient (network and IO bottlenecks).

Hadoop (especially on elastic grids) solves these problems with: (1) a very large block size distributed file system, (2) centralized administration, and (3) a map-reduce programming model. How it works 1. The input is broken up into as many chunks as there are mappers.
2. Each input record is transformed into zero or more records by its mapper.
3. The mapped records are partitioned by key (in the example above, evens and odds), one partition per reducer.
4. The reducers produce output files into an output directory. The job will fail if the output directory already exists. Map, Filter, Reduce map(square, [3, 4, 10, −5, 8]) ==> [9, 16, 100, 25, 64]
filter(odd, [3, 6, 9, −11, 4]) ==> [3, 9, −11]
reduce(plus, [3, −-8, 1, 20]) ==> 3 + (-8) + 1 + 20 = 16 One approach to efficient distributed computation comes from the world of functional programming: map, filter, and reduce. // Sum of squares of all odd numbers
a.filter(odd).map(square).reduce(plus); Functional programming is inherently parallelizable because it is side-effect free (stateless). Summary Apache Hadoop is a software framework for running applications with petabytes or more of data on multiple nodes.
Hadoop programs are written using (1) the low-level Java Hadoop API, (2) Streaming, (3) Pig, or (4) Hive. Choose Hive for ad-hoc (and generally read-only) queries, reports and analytics.

Choose Pig for moderately complex processing, generally involving joins.

Choose Streaming for most join-free tasks that require some computation that is convenient to write in a full-fledged programming language.

Choose the Java API only when for some weird reason nothing else is fast enough. This course only covered the very basics of program Hadoop applications with streaming, Pig, and Hive; not how to install, set up, configure, monitor (via job trackers and task trackers), or troubleshoot. Is a data warehousing system for Hadoop
Is used for ad-hoc queries, data summaries and analysis
Comes with HiveQL, a declarative, SQL-like query language
Requires data to be described as tables, with a schema
Does NOT give you low-latency queries
Has a Getting Started Guide on the Hive wiki
Has a Language Manual worth reading, too Hive Pig is a system for processing Hadoop jobs using Pig Latin (a high-level dataflow language).
Like Hive, it hides a ton of underlying Hadoop complexity.
Pig comes with a shell (called grunt)
"In one study, 10 lines of Pig Latin = 200 lines of Java... What took 4 hours to write in Java took 15 minutes in Pig Latin." — Cloudera
Pig can be run in an interactive shell (called grunt), or you run a script stored in a file, or you can bundle up a script a run it from Java. Pig Streaming is a utility that lets you run mapreduce jobs by writing your mapper and your reducer as stdin to stdout programs in any language. Hadoop Streaming drop table movies;
create external table if not exists movies (
pid string,
status string,
type string,
titles string,
ratings string,
locations string,
box_office string
row format delimited
fields terminated by '\t'
location '/user/feeds/movies'; describe movies; -- Count all movie records

select count(1)
from movies; -- Dump movie records

select *
from movies
limit 10; -- Get released movies

select *
from movies
where status='Released'
limit 10; -- Which films came from Cuba?

select distinct pid
from movies
where origins like '%Cuba%'
limit 20; -- Get all ratings

select distinct rating
from sample_content; -- Find number of movies per status

select status, count(1)
from movies
group by status; Users = LOAD 'users' AS (name, age);
Filtered = FILTER Users BY age >= 18 AND age <= 25;
Pages = LOAD 'pages' AS (user, url);
Joined = JOIN Filtered BY name, Pages BY user;
Grouped = GROUP Joined BY url;
Summed = FOREACH Grouped GENERATE group, COUNT(Joined) AS clicks;
Sorted = ORDER Summed BY clicks DESC;
TopFive = LIMIT Sorted 5;
STORE TopFive INTO 'topfivepages' Content = LOAD '/user/training/content' AS (
feed, content_vid, place_vid, type, pid, name, value);
Grouped = GROUP Content by type PARALLEL 12;
Summed = FOREACH Grouped GENERATE group, COUNT(Content) AS counts;
DUMP Summed; Content = LOAD '/user/training/content' AS (
feed, content_vid, place_vid, type, pid, name, value);
Grouped = GROUP Content by type PARALLEL 12;
Summed = FOREACH Grouped GENERATE group, COUNT(Content) AS counts;
STORE Summed INTO '/user/training/content-counts'; Grid Computing and Hadoop You can practice with Hadoop on your own machine, but generally MapReduce jobs run on a cluster. The file system (HDFS) is distributed across nodes. HDFS is distributed, scalable, and redundant, and optimized for:

A modest number of huge files (over 100GB)
Huge block sizes (at LEAST 64MB)
Writes by appending
Reads that are huge, streaming
High throughput, not low latency

Command line:
hadoop fs -SUBCOMMAND -OPTIONS ARGS hadoop
hadoop fs
hadoop fs -ls /user/alice
hadoop fs -lsr /user/alice
hadoop fs -mkdir testing
hadoop fs -rmr testing
hadoop fs -copyFromLocal films.csv films
hadoop fs -cat films | head -n 10
hadoop fs -lsr report
hadoop fs -copyToLocal report/part* . Content = LOAD '/user/training/content' AS (
feed, content_vid, place_vid, type, pid,
name, value);
DESCRIBE Content; Content = LOAD '/user/training/content' AS (
DESCRIBE Content; Content = LOAD '/user/training/content' AS (
PlaceIds = FOREACH Content GENERATE pid;
DistinctPlaceIds = DISTINCT PlaceIds PARALLEL 12;
Grouped = GROUP DistinctPlaceIds ALL PARALLEL 12;
Counted = FOREACH Grouped GENERATE COUNT(DistinctPlaceIds);
DUMP Counted; cat, cd, copyFromLocal, copyToLocal, cp, ls, mkdir, mv, pwd, rm, rmf, kill, help, quit, set




int, long, float, double, chararray, bytearray, tuple, bag, map

AVG, CONCAT, COUNT, DIFF, MIN, MAX, SIZE, SUM, TOKENIZE, IsEmpty, PigStorage, BinStorage, BinaryStorage, TextLoader, PigDump Commands Diagnostic Operators UDF Definition Relational Operators Types Built-in Functions Pig Latin head -n 100 beetles | ./word-count-mapper.rb | sort | ./word-count-reducer.rb Map-Only Job:
-numReduceTasks 0

Reduce-Only Job:
-mapper '/bin/cat'

Sometimes you have to chain jobs.

Joins are possible with streaming but take some thought.

MAPREDUCE-577 is a nasty bug that exists in 1.0.4, meaning you should preprocess XML.

Full streaming docs:
http://hadoop.apache.org/docs/r1.0.4/streaming.html Good To Know OH! With streaming, you can test
outside of hadoop! So cool! More Demos hadoop jar ${HADOOP_HOME}/contrib/streaming/hadoop-*-streaming.jar \
-D mapred.task.timeout=3600000 \
-D mapred.map.tasks=${NUM_MAP_TASKS} \
-D mapred.reduce.tasks=${NUM_RED_TASKS} \
-D stream.non.zero.exit.is.failure=true \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options=-k1 \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input ${HDFS_TRUTHFILE}/part-* \
-mapper " awk -F'\t' -f add_match_key.awk " \
-reducer " java -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dfile.encoding=UTF-8 -Xmx768m -cp ./lib/match-1.0.1-jar-with-dependencies.jar com.example.match.MatchReducer " \
-file ${MASTER_HOME}/match/lib/match-1.0.1-jar-with-dependencies.jar \
-file ${MASTER_HOME}/match/scripts/add_match_key.awk \
${print_verbose} A real-life example Related Projects HBase - the Hadoop Database

Mahout - a machine learning library for big data

Chukwa - for data collection for monitoring

Hama - for massive scientific computations

Zookeeper - distributed coordination and sychrnoization service

Avro - data serialization, competes with Thrift and Protocol Buffers

Oozie - a workflow engine JavaScript Example -- How to write to an output directory

insert overwrite directory '/user/reports/status-counts'
select status, count(1)
from movies
group by status; MapReduce in Hadoop Hadoop's MapReduce doesn't have an explicit filter() call.
Instead, each map() call produces 0, 1, or more results. (See how this filters?)
Also, all items in the mapreduce flow are key-value pairs.
Between map and reduce, Hadoop will sort for you! Word Count: The Hello,
World of MapReduce # Pseudocode

def mapper(_, line):
return [(lower(w),1) for w in split(line)]

def reducer(word, counts):
return sum(counts) Example using
Cloudera's distro
on Amazon EC2 It's a little different ... Things are a just a little different with streaming.

Mappers and reducers operate on text file lines, not data structures
Mapper output and reducer input by default is assumed to be TSV with the first column being that special key!
Reducer inputs are NOT grouped into (key, valuelist) but instead the keys are repeated.
You can do some configuring of course but the defaults aren't too bad. Streaming Keys/Values? THE ONLY KEY THAT REALLY MATTERS IS K2! And beware of the curse of the last reducer... Word Count Diagram for Streaming #! /usr/bin/ruby
ARGF.each do |line|
line.downcase.scan(/[a-z']+/).each do |word|
puts "#{word}\t1"
end Word Count with Ruby Streaming #!/usr/bin/ruby
counts = Hash.new 0
ARGF.each do |line|
word, count = line.split("\t")
counts[word] += count.to_i
counts.each {|k,v| puts "#{k}\t#{v}"} Mapper Reducer #! /bin/bash
hadoop jar streaming.jar \
-input beetles \
-output beetles-count \
-mapper word-count-mapper.rb \
-reducer word-count-reducer.rb \
-file word-count-mapper.rb \
-file word-count-reducer.rb \
-numReduceTasks 3 Running the Streaming Word Count App Demo $ ./word-count
$ hadoop fs -ls beetles-count
$ hadoop fs -cat beetles-count/par* hadoop jar streaming.jar \
-input movies \
-output how-many-movies \
-mapper '/bin/cat' \
-reducer '/usr/bin/wc -l' Completely trivial streaming example
Counting all the records in a data set Too trivial for anyone to care... Seriously, how cool
is that? ruby -ne "puts \$_.downcase.scan(/[A-Z']+/i)" beetles|sort|uniq -c A mapper-only job to preprocess XML into CSV

A simple example with joins (next time) Faster in Mongo :)
Full transcript