- Sink writes to local file (one file each version)
- File will be loaded into Hadoop
Work around as streaming caused risk on Hadoop outages (24h Buffer only)
Current Configuration:
- Waits for incoming flume events from Flume Sources
- Schema events are cached by their checksum
- Can create filesystem/hdfs streams for avro and non-avro events
- Periodically checks for inactive output streams to send final close
- Destination file path dynamically calculated (regex-like)
- *remember* Avro files have the schema coded in
Flume Sink
Versioning on Flume Source
Extending Avro Records
Extending a subrecord
- A Flume TCP source might get different schema version AdServers (especially during rollouts)
- Flume event has some metainfos used for routing
- The schema flume events trigger a checksum creation
- All packages are queued and transferred by flume to the sink
- Add the new field to the schema
- Handle the new field in the code of the sub record
- Provide the new binary (log producer)
- Provide the new schema (through self build auto propagation)
Adding a subrecord
- Add the new sub record to the schema as a union
- ["string", "null"] declares a schema which may be either a string or null.
- Provide the new binary
- Provide the new schema (through auto propagation)
Avro Schema Example
Header Part
{
"name": "DataAvroPacket",
"fields": [
{
"name": "SGSHeader",
"type": {
"name": "SGSHeader",
"fields": [
{
"name": "VersionID",
"type": "int"
}
],
"type": "record"
}
},...
Avro in Detail
- We use one schema only for same type records
(e.g. impressions on Display, Video)
- Less schemas result in an easier versioning handling
- Does not always work out:
- Different schemas for different producers for easy deployment
Two message types:
- On startup the used schema is sent as payload
- Following Flume events contain AVRO data section only
Analysis
Use the right tool for the Job!
Why?
- Scales across multiple nodes
- Read oriented
- 'Complete' SQL functionality
- Fast Bulk Loads fit Hadoop aggregation output
Architecture
PIG Aggregation Example
AvroStorage
Avro Support
Hourly Aggregation:
REGISTER /usr/lib/pig/contrib/piggybank/java/lib/avro-1.5.4.jar
%default INFILE '/var/tmp/example1.avro’
...
rec1 = LOAD '$INFILE’
USING org.apache.pig.piggybank.storage.avro.AvroStorage ('{}');
rec1Data = FOREACH rec1 GENERATE SGSMainPacket.PlacementId,SGSMainPacket.CampaignId,
SGSMainPacket.BannerNumber, $REP_DATE AS DATE, $REP_HOUR AS HOUR;
recGroup = GROUP rec1Data BY ( PlacementId,CampaignId,BannerNumber,DATE,HOUR);
fullCount = FOREACH recGroup GENERATE
1, -- VERSION COUNTER
group.PlacementId,group.CampaignId,group.BannerNumber,group.DATE,group.HOUR,
COUNT(rec1Data) AS TOTAL;
STORE fullCount INTO '$OUTFILE’
USING org.apache.pig.piggybank.storage.avro.AvroStorage (‘
{
"schema":
{ "name" : "SummaryHourly”,
"type" : "record”,
"fields": [
{ "name": "Version", "type": "int" },
{ "name": "PlacementId", "type": "int" },
{ "name": "CampaignId", "type": "int" },
{ "name": "BannerNumber", "type": "int" },
{ "name": "DateEntered", "type": "int" },
{ "name": "Hour", "type": "int" },
{ "name": "COUNT", "type": "long" }
]
}
}');
AvroStorage
- Component that reads and writes AVRO
Issues (our version)
- Does not support Globs
- Globs allow to select files with kind of a regex pattern
- Does not support different schemas to be read
- Has limitations on schema union
Fix
- Patch applied successfully to Cloudera distro
- Helped testing it
SQL, NoSQL, NewSQL?
Analysis
Use the right tool for the job!
Why?
- Log aggregation framework
- Failover
- Flexible routing
- Java (in comparison to scribe)
- Potentially better fit to Hadoop
Aggregation
Use the right tool for the job!
- Every solution has it's strength and weaknesses
- If there's no weakness, your evaluation isn't good enough
- Use the tool as designed
- Don't go for performance charts!
- Do your own measurements
- Think about stability
- Think about maintenance
- What do you really need?
Why Hadoop?
- IO throughput
- Scalability
- Map/Reduce concept
- Perfect replacement for ADTECH
- Why PIG?
- Easier then native jobs
- Less code
- Why HIVE?
- Easy to adopt
- RDBMS knowledge transfer
Logging Data
Use the right tool for the job!
Why?
- Structured data (like XML)
- Binary (like Protocol Buffers)
- Compressed encoding
- Fast serialization
- Fast deserialization
- Schema awareness
- JSON schema
- We do not use transport layer!
Vertica Lessons Learned
Hadoop Lessons Learned
What's Next?
Flume Lessons Learned
Avro Lessons Learned
Aggregations with Avro - Unions
Why?
- Scales across multiple nodes
- Read oriented
- 'Complete' SQL functionality
- Fast bulk loads fit Hadoop aggregation output
Our apples & oranges metrics:
- Loads 5-10x faster vs. MySQL
- 1k replayed report queries 10x
- Ad hoc queries 4x
Data Management API
Use the right tool for the job!
+ JDBC & ODBC Driver integration
+ Performance very good on complex queries
+ Load times
+ Quite good SQL support
+ Migration from MySQL went smoother
than expected
- Not suited well for small updates
- Loading data with PK clashes breaks
- Rethink build-in revisoning
+ Flexible dataflow
+ Throughput looks good (approx 30MB/sec)
+ Sources/sinks are easy to modify and extend
+ Log changes without aggregation changes possible
+ Hardware specs are moderate
- n/2 + 1 nodes for failover not suited well for cross
datacenter flows
- Needed more nodes than expected
- Could see impact on complete flow when one NW
interface was configured wrong using 100MBit only
- REST API for customers
- Running HIVE SQL dynamically
- Manage Big Data yourself
+ Quite stable
+ Updates went well (node by node)
+ IO rates
+ Data partitioning is key
- Performance on 2 clusters with same machines very
different - look for config & network tuning!
- Seen long startup times
- Synchronizing files can be problematic due to non
blocking behaviour
- Configuration extraordinary complex, too many
parameters
- No good understading of what slows jobs down
- Interpretation of Job runtime information (Starfish)
- Fuse very unstable, loosing file handles - use HDFS
API!
Transport Layer
Unions
- Work only on same schema
- Throws error otherwise
Schema Union
- Union on flat schemas is available
- Does not work on subschemas
Fixed Schema Support
- Currently testing the available patch
- Likely to go into a future PIGGYBANK release
+ Structure support
+ Built-in schemas
+ Java API is well supported and seems consistent
+ Versioning works well when things get added
+ No schema compile necessary
- C is not well documented and intuitive to use
- C has no transport layer support
- C++ seems to suffer under same issues than C,
but not deeply investigated
- Not build for convenient programming
(no XML/JSON replacement)
- Not 100% supported out of the box with PIG/HIVE
Should be fixed within one of the next releases!
Questions
dennis.meyer@adtech.com
uwe.seiler@codecentric.de
@uweseiler
Dennis Meyer + Uwe Seiler
- Industry: Digital Advertising (Display, Mobile, Video)
- Main product: AdServer
- Founded in 1998
- AOL company
- 500+ customers world wide
- 6 billion ad impressions/day
- 4 data centers in EU/US
- Development in Dreieich, Dublin, Serbia (codecentric)
Just an awesome bunch of...
Guess what? We're hiring!
Big Data Nerds
Agile Ninjas
Continuous Delivery Gurus
Enterprise Java Specialists
Map/Reduce in Action:
Large Scale Reporting
Based on Hadoop and Vertica
AdServer?
Performance Geeks
The dilemma about "Big Data"
"Business wants near real-time, but without penalties or data loss, with endless scalability, zero-latency and 100% consistency."
Batch views are static, only random reads required
Technologies
Indexes and exposes views for querying
Serving Layer
JavaScript
c++
reportingcore
SQL
extension
Interested?
We're looking for developers!!!
Computes incremental, transient real-time views
Compensates for the high latency of the batch layer
Speed Layer
Pick your poison(s)...
...and bring them all together
Iteratively computes custom views from the master dataset via Map/Reduce
Stores the immutable, constantly growing master dataset
Batch Layer