OPEN SOURCE LAMBDA ARCHITECTURE KAFKA · HADOOP · SAMZA · DRUID FANGJIN YANG · GIAN MERLINO · DRUID COMMITTERS
PROBLEM DEALING WITH EVENT DATA MOTIVATION EVOLUTION OF A “REAL-TIME” STACK ARCHITECTURE THE “RAD”-STACK NEXT STEPS TRY IT OUT FOR YOURSELF OVERVIEW
THE PROBLEM
2013 THE PROBLEM ‣ Arbitrary and interactive exploration of time series data • Ad-tech, system/app metrics, network/website traffic analysis ‣ Multi-tenancy: lots of concurrent users ‣ Scalability: 10+ TB/day, ad-hoc queries on trillions of events ‣ Recency matters! Real-time analysis
2013 FINDING A SOLUTION ‣ Load all your data into Hadoop. Query it. Done! ‣ Good job guys, let’s go home
2013 FINDING A SOLUTION Hadoop EventStreams Insight
2013 PROBLEMS WITH THE NAIVE SOLUTION ‣ MapReduce can handle almost every distributed computing problem ‣ MapReduce over your raw data is flexible but slow ‣ Hadoop is not optimized for query latency ‣ To optimize queries, we need a query layer
2013 FINDING A SOLUTION Hadoop (pre-processing and storage) Query Layer Hadoop EventStreams Insight
A FASTER QUERY LAYER
2013 MAKE QUERIES FASTER ‣ What types of queries to optimize for? • Revenue over time broken down by demographic • Top publishers by clicks over the last month • Number of unique visitors broken down by any dimension • Not dumping the entire dataset • Not examining individual events
2013 FINDING A SOLUTION Hadoop (pre-processing and storage) RDBMS Hadoop EventStreams Insight
2013 FINDING A SOLUTION Hadoop (pre-processing and storage) NoSQL K/V Stores Hadoop EventStreams Insight
2013 FINDING A SOLUTION Hadoop (pre-processing and storage) Commercial Databases Hadoop EventStreams Insight
DRUID AS A QUERY LAYER
2013 DRUID ‣ Druid project started in 2011, went open source in 2012 ‣ Designed for low latency ingestion and ad-hoc aggregations ‣ Designed for keeping around a lot of history (years are ok) ‣ Growing Community • ~100 contributors • Used in production at numerous large and small organizations
2014 REALTIME INGESTION >500K EVENTS / SECOND AVERAGE >1M EVENTS / SECOND PEAK 10 – 100K EVENTS / SECOND / CORE DRUID IN PRODUCTION
2014 0.0 0.5 1.0 1.5 0 1 2 3 4 0 5 10 15 20 90%ile95%ile99%ile Feb 03 Feb 10 Feb 17 Feb 24 time querytime(seconds) datasource a b c d e f g h Query latency percentiles QUERY LATENCY (500MS AVERAGE) 90% < 1S 95% < 5S 99% < 10S DRUID IN PRODUCTION
2013 RAW DATA timestamp publisher advertiser gender country click price 2011-01-01T01:01:35Z bieberfever.com google.com Male USA 0 0.65 2011-01-01T01:03:63Z bieberfever.com google.com Male USA 0 0.62 2011-01-01T01:04:51Z bieberfever.com google.com Male USA 1 0.45 ... 2011-01-01T01:00:00Z ultratrimfast.com google.com Female UK 0 0.87 2011-01-01T02:00:00Z ultratrimfast.com google.com Female UK 0 0.99 2011-01-01T02:00:00Z ultratrimfast.com google.com Female UK 1 1.53
2013 ROLLUP DATA timestamp publisher advertiser gender country impressions clicks revenue 2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70 2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18 2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31 2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01 ‣ Truncate timestamps ‣ GroupBy over string columns (dimensions) ‣ Aggregate numeric columns (metrics)
2013 PARTITION DATA timestamp publisher advertiser gender country impressions clicks revenue 2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70 2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18 2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31 2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01 ‣ Shard data by time ‣ Immutable chunks of data called “segments” Segment 2011-01-01T02/2011-01-01T03 Segment 2011-01-01T01/2011-01-01T02
2013 IMMUTABLE SEGMENTS ‣ Fundamental storage unit in Druid ‣ Read consistency ‣ One thread scans one segment ‣ Multiple threads can access same underlying data ‣ Segment sizes -> computation completes in ms ‣ Simplifies distribution & replication
2013 COLUMN ORIENTATION timestamp publisher advertiser gender country impressions clicks revenue 2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70 2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18 ‣ Scan/load only what you need ‣ Compression! ‣ Indexes!
DRUID INGESTION ‣ Must have denormalized, flat data ‣ Druid cannot do stateful processing at ingestion time ‣ …like stream-stream joins ‣ …or user session reconstruction ‣ …or a bunch of other useful things! ‣ Many Druid users need an ETL pipeline
2013 DRUID REAL-TIME INGESTION Druid Realtime Workers Immediate Druid Historical Nodes Periodic Druid Broker Nodes Data Source User queries
2013 DRUID REAL-TIME INGESTION Druid Realtime Workers Druid Historical Nodes Periodic Druid Broker Nodes Data Source User queries
2013 DRUID REAL-TIME INGESTION Druid Realtime Workers Immediate Druid Historical Nodes Periodic Druid Broker Nodes Data Source Stream Processor User queries
2013 DRUID REAL-TIME INGESTION Druid Realtime Workers Immediate Druid Historical Nodes Periodic Druid Broker Nodes User queries
STREAMING DATA PIPELINES
AN EXAMPLE: ONLINE ADS ‣ Input data: impressions, clicks, ID-to-name mappings ‣ Output: enhanced impressions ‣ Steps ‣ Join impressions with clicks ->“clicks” ‣ Look up IDs to names -> “advertiser”, “publisher”, … ‣ Geocode -> “country”, … ‣ Lots of other additions
PIPELINE Impressions Clicks Druid ?
PIPELINE Impressions Partition 0 {key: 186bd591-9442-48f0, publisher: foo, …} {key: 9b5e2cd2-a8ac-4232, publisher: qux, …} … Partition 1 {key: 1079026c-7151-4871, publisher: baz, …} … Clicks Partition 0 … Partition 1 {key: 186bd591-9442-48f0} …
PIPELINE Impressions Clicks Druid
PIPELINE Impressions Clicks Shuffled Shuffle Druid
PIPELINE Shuffled Partition 0 {type: impression, key: 186bd591-9442-48f0, publisher: foo, …} {type: impression, key: 1079026c-7151-4871, publisher: baz, …} {type: click, key: 186bd591-9442-48f0} … Partition 1 {type: impression, key: 9b5e2cd2-a8ac-4232, publisher: qux, …} …
PIPELINE Impressions Clicks Shuffled Shuffle Druid
PIPELINE Impressions Clicks Shuffled Joined Shuffle Join Druid
PIPELINE Joined Partition 0 {key: 186bd591-9442-48f0, is_clicked: true, publisher: foo, …} {key: 1079026c-7151-4871, is_clicked: false, publisher: baz, …} … Partition 1 {key: 9b5e2cd2-a8ac-4232, is_clicked: false, publisher: qux, …} …
PIPELINE Impressions Clicks Shuffled Joined Shuffle Join Druid
PIPELINE Impressions Clicks Shuffled Joined Shuffle Join Enhance & Output Druid
ALTERNATIVE PIPELINE Impressions Clicks Shuffled Joined Shuffle Join Enhance Druid Enhanced
REPROCESSING
WHY REPROCESS DATA? ‣ Bugs in processing code ‣ Imprecise streaming operations ‣ …like using short join windows ‣ Limitations of current software ‣ …Kafka 0.8.x, Samza 0.9.x can generate duplicate messages ‣ …Druid 0.7.x streaming ingestion is best-effort
LAMBDA ARCHITECTURES ‣ Hybrid batch/streaming data pipeline ‣ Batch technologies • Hadoop MapReduce • Spark ‣ Streaming technologies • Samza • Storm • Spark Streaming
LAMBDA ARCHITECTURES ‣ Advantages? • Works as advertised • Works with a huge variety of open software • Druid supports batch-replace-by-time-range through Hadoop
LAMBDA ARCHITECTURES ‣ Disadvantages? ‣ Need code to run on two very different systems ‣ Maintaining two codebases is perilous ‣ …productivity loss ‣ …code drift ‣ …difficulty training new developers
LAMBDA ARCHITECTURES Data streaming
LAMBDA ARCHITECTURES Data batch
LAMBDA ARCHITECTURES Data streaming batch
KAPPA ARCHITECTURE ‣ Pure streaming ‣ Reprocess data by replaying the input stream ‣ Doesn’t require operating two systems ‣ Doesn’t overcome software limitations ‣ I don’t have much experience with this ‣ http://radar.oreilly.com/2014/07/questioning-the-lambda- architecture.html
OPERATIONS
NICE THINGS ABOUT KAFKA ‣ Scalable, replicated pub/sub ‣ Replayable message logs ‣ New consumers can read all old messages ‣ Existing consumers can reprocess all old messages
NICE THINGS ABOUT SAMZA ‣ Multi-tenancy: one main thread per container ‣ Robustness: isolated containers limit slowness and failure ‣ Visibility ‣ Multistage jobs, lots of metrics per stage ‣ Can inspect the message queue in Kafka ‣ State is simple ‣ Logging and restoring handled for you ‣ Single-threaded programming
NICE THINGS ABOUT DRUID ‣ Fast ingestion, fast queries ‣ Seamlessly merge stream-ingested and batch-ingested data ‣ Batch loads can “replace” stream loads for the same time range
NICE THINGS ABOUT HADOOP ‣ Solid batch processing system ‣ Easy to partition and reprocess data by time range ‣ Jobs can process all data, or a pre-partitioned slice
MONITORING ‣ Kafka partition availability ‣ Kafka log cleaner ‣ Samza consumer offsets ‣ Druid ingestion process rate ‣ Druid ingestion drop rate ‣ Druid query latency ‣ System metrics: CPU, network, disk ‣ Event counts at various stages
STREAM METRICS
STREAM METRICS
DO TRY THIS AT HOME
2013 CORNERSTONES ‣ Druid - druid.io - @druidio ‣ Samza - samza.apache.org - @samzastream ‣ Kafka - kafka.apache.org - @apachekafka ‣ Hadoop - hadoop.apache.org
GLUE Tranquility Camus / Secor Druid Hadoop indexer
GLUE Camus / Secor Druid Hadoop indexer druid-kaka-eight
TAKE AWAYS ‣ Consider Kafka for making your streams available ‣ Consider Samza for streaming data integration ‣ Consider Druid for interactive exploration of streams ‣ Metrics, metrics, metrics ‣ Have a reprocessing strategy if you’re interested in historical data
THANK YOU

Open Source Lambda Architecture with Hadoop, Kafka, Samza and Druid