Loading presentation...

Present Remotely

Send the link below via email or IM


Present to your audience

Start remote presentation

  • Invited audience members will follow you as you navigate and present
  • People invited to a presentation do not need a Prezi account
  • This link expires 10 minutes after you close the presentation
  • A maximum of 30 users can follow your presentation
  • Learn more about this feature in our knowledge base article

Do you really want to delete this prezi?

Neither you, nor the coeditors you shared it with will be able to recover it again.


Processing Large Datasets with Hadoop and Python

July 31-Aug 1, PyOhio 2010

William McVey

on 1 August 2010

Comments (0)

Please log in to add your comment.

Report abuse

Transcript of Processing Large Datasets with Hadoop and Python

Processing Large Datasets
Python Open source implementation of Google's BigTable and Google Filesystem papers
Top-level Apache project
Provides distributed and replicated filesystem
Processing data is primarily through map/reduce jobs
Used extensively by folks like:

Implemented in Java
Processing Large Datasets Map/Reduce Distributed Filesystem Cisco Systems Applied Security Research
We collect:
Honeypot data
PCAP files
Logfiles from hosts, routers, and other inline devices
Netflow summaries Hadoop Filesystem Standard block based filesystem
Rather than disk blocks, blocks are (generally) network resources
Backends can be HDFS, Amazon S3, or other gateway (such as FTP or HTTP)
Built in (configurable) replication
Intended for very large files (Gigs to Terabytes) HDFS Default implementation of the Hadoop Filesystem
Participating nodes in a cluster run the DataNode daemon
Other nodes, running the NameNode daemon, track filename to datablock mappings Read from a Hadoop Filesystem Image from: Hadoop: The Definitive Guide Image from: Hadoop: The Definitive Guide Writing a File Technique popularized by Google for parrallel processing
Split a problem and (largish) input data across multiple machines (map phase)
Run the calculation on the distributed set of systems
Return back the results (reduce phase) Map/Reduce in Hadoop
In Hadoop, the data is already distributed!

Just need to push out jobs to run against the data records stored on individual DataNodes

Map/reduce jobs are coordinated by a JobTracker daemon and processed by TaskTracker daemons Accessing the Filesystem To access/manipulate the data, you can:
Use `hadoop fs` command
Use `ftp` interface
Use FUSE hadoop@seeker-util3:~$ hadoop fs -ls /
Found 2 items
drwxr-xr-x - hadoop supergroup 0 2010-07-09 14:21 /hbase
drwxr-xr-x - hadoop supergroup 0 2010-06-08 11:08 /usr
hadoop@seeker-util3:~$ hadoop fs -df /
Filesystem Size Used Avail Use%
/ 150909288448 434176 137249853440 0%
wam@seeker-util3:~$ df -h
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 36G 1.4G 32G 5% /
none 497M 168K 497M 1% /dev
none 501M 0 501M 0% /dev/shm
none 501M 76K 501M 1% /var/run
none 501M 0 501M 0% /var/lock
none 501M 0 501M 0% /lib/init/rw
none 36G 1.4G 32G 5% /var/lib/ureadahead/debugfs
fuse_dfs 141G 0 141G 0% /hadoop-hdfs
But wait a minute...

Is the filesystem abstraction the best there is for managing large datasets? Look behind the curtain... Hint: databases were
developed for a reason. William McVey <wam@cisco.com> Our Data Processing Requirements Often we don't know what data we need until months after it was collected
Ever increasing amount and rate of storage requirements
Small group of researchers dealing with few terrabytes of data now, more before long
A column oriented datastore built on top of the Hadoop distributed filesystem. But wait...

There is always Jython...
or language agnostic gateways Avro Thrift Stargate
A RESTful web service on HBase

methods define the access to the data

"Accept" header specifies format for results:
text/plain (default)
application/x-protobuf Requesting a "content" table's schema: curl http://localhost:8000/content/schema

HTTP/1.1 200 OK
Content-Length: 639
Cache-Control: no-cache
Content-Type: text/plain

{ NAME=> 'content', IS_META => 'false', IS_ROOT => 'false', COLUMNS => [ { NA
ME => 'content', BLOCKSIZE => '65536', BLOOMFILTER => 'false', BLOCKCACHE =>
'false', COMPRESSION => 'GZ', LENGTH => '2147483647', VERSIONS => '1', TTL =>
'-1', IN_MEMORY => 'false' }, { NAME => 'info', BLOCKSIZE => '65536', BLOOMFI
LTER => 'false', BLOCKCACHE => 'false', COMPRESSION => 'NONE', LENGTH => '214
7483647', VERSIONS => '1', TTL => '-1', IN_MEMORY => 'false' }, { NAME => 'ur
l', BLOCKSIZE => '65536', BLOOMFILTER => 'false', BLOCKCACHE => 'false', COMP
RESSION => 'NONE', LENGTH => '2147483647', VERSIONS => '1', TTL => '-1', IN_
MEMORY => 'false' } ] } Requesting results for a particular index: curl -H "Accept: application/json" http://localhost:8000/content/00012614f7d43df6418523445a6787d6/content:raw

HTTP/1.1 200 OK
Cache-Control: max-age=14400
Content-Type: application/json
Transfer-Encoding: chunked

29tL2R1bmNhbnJpbGV5Ij5oZXJlPC9hPi48L3A+CjwvYm9keT48L2h0bWw+Cg=="} } } Cons to Using Stargate By most accounts, it's much slower than the alternative gateways

Development on this interface gateway has just about completly stopped A cross language communication framework
C++, Java, Python, Ruby, Erlang, Perl, C#
PHP, Haskell and more
Initially developed at Facebook
Open sourced in 2007
Apache incubator project in 2008
- http://incubator.apache.org/thrift/

Provides an RPC mechanism, similar to XDR/RPC The new hotness. A recent graduate to top level Apache project
A fast binary data serialization system
Rich data structures (e.g. nested dictionaries)
RPC exchange of Avro messages
Easy scripting language integration It's REALLY new.

Only available in development snapshots of HBase Cons of using Avro So I use... Some HBase Features Highly Scalable As new capacity is needed, just add machines to cluster
Rows are kept sorted within a block
As existing blocks fill and need to be rebalanced, new cluster will automatically begin getting utilized.
If you need more redundancy/parrellism, tune up replication on the underlying HDFS Dynamic Columns A typical relational db requires the datamodel to be pre-declared and mostly static.

HBase allows dynamic columns to be created, grouped within pre-declared column families.

Columns themselves are byte arrays, and HBase can support millions of (possibly sparse) columns on a single table.

Bloom filters can be enabled within a column family to efficiently answer 'what columns exist for this row?' Versioned Data As data is added into a row/column, a timestamp is also attached.
Column families can keep a configurable number of prior versions of the data for a cell
Data can be queried/retrieved based on:
- latest timestamp (default)
- all timestamps
- data as it was at some specified date
Cell contents can also have a Time to Live set, to auto-expire data. Some HBase Caveats No Joins No SQL Data is generally not normalized Single Key You have a single index to your table

The index is stored in lexographic sort order There is no facility to automatically join search results of two tables. consequently... in fact... There is actually no SQL interface to HBase at all.

Configuration, queries, and updates are all performed via the HBase API This actually isn't that big of a deal...

Even relational databases of large size generally begin throwing away normalization HBase Schema Design Tips and Considerations Think in terms of memcached memcached has a similar single-key access pattern

Remember though, HBase allows you to query multiple columns for a particular index.

Choose your index carefully, ideally considering your usage pattern Sparse Columns are Your Friend Unlike a relational database, empty columns don't incur a storage requirement to store a NULL value

Columns within a column family are byte strings, and their presence on a record can provide information just as well the value associated with the cell Isn't this all Java?
I want to code in... hbase-thrift Python package http://code.google.com/p/hbase-thrift/
or just:
pip install hbase-thrift Basic Thrift API Establishing a connection def get_thrift_connection(host='localhost', port=9090):
transport = TSocket.TSocket(host, port)
# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Client(protocol)

# Connect the transport
return client
Initializing a Table client.createTable(
ColumnDescriptor(col_name, options),
) Setting/Updating a Row Create a set of mutation objects:

table_name, index, mutations)
table_name, index, mutations, timestamp) Fetching Data client.get(
table_name, row, column)
table_name, row, columns)
client.getRow(table_name, row) Basic Cell Lookup Versioned Lookups client.getVer(table_name,
row, column, number_of_versions)
row, column, timestamp, numversions)
table_name, row, timestamp) "But what if I don't know my row key?" Scanners Brute force iterator, similar to a relational db cursor

Can be constrained to a prefix of a index. Scanner API scannerOpen(table, startRow, columns)
startRow, stopRow, columns)

scannerClose(id) Other Resources Dumbo map/reduce API for Python

Summary Cassandra Another distributed database, similar in many respects to HBase
Cloudera Hadoop Distribution Cloudera provides a role similar to a Linux distribution for a consistent (and patched) version of hadoop http://bit.ly/pyohio-hadoop-talk
Full transcript