Building a Versatile Analytics Pipeline On Top Of Apache Spark Misha Chernetsov, Grammarly Spark Summit 2017 June 6, 2017
Data Team Lead @ Grammarly Building Analytics Pipelines (5 years) Coding on JVM (12 years), Scala + Spark (3 years) About Me: Misha Chernetsov @chernetsov
Tool that helps us better understand: ● Who are our users? ● How do they interact with the product? ● How do they get in, engage, pay, and how long do they stay? Analytics @ Consumer Product Company
We want our decisions to be data-driven Everyone: product managers, marketing, engineers, support... Analytics @ Consumer Product Company
data analytics report Analytics @ Consumer Product Company
Calendar Day Number of unique active users by day Example Report 1 – Daily Active Users dummy data!dummy data!
Example Report 2 – Comparison of Cohort Retention Over Time dummy data!dummy data!
Ads Email Social Number of users who bought a subscription. Split by traffic source type (where user came from) Calendar Day Example Report 3 – Payer Conversions By Traffic Source dummy data!dummy data!
● Landing page visit ○ URL with UTM tags ○ Referrer ● Subscription purchased ○ Is first in subscription Example: Data
Everything is an Event Example: Data { "eventName": "page-visit", "url": "...?utm_medium=ad", … } { "eventName": "subscribe", "period": "12 months", … }
Enrich and/or Join Example: Data { "eventName": "page-visit", "url": "...?utm_medium=ad", … } { "eventName": "subscribe", "period": "12 months", … } Slice by Plot
capture enrich index query Analytics @ Consumer Product Company
capture enrich index query Use 3rd Party? 1. Integrated Event Analytics 2. UI over your DB Reports are not tailored for your needs, limited capability. Pre-aggregation / enriching is still on you. Hard to achieve accuracy and trust.
capture enrich index query Build Step 1: Capture
● Always up, resilient ● Spikes / back pressure ● Buffer for delayed processing Kafka Capture REST { "eventName": "page-visit", "url": "...?utm_medium=paid", … }
Long-term Storage StreamKafka Save To Long-Term Storage
Cassandra micro-batch Kafka Save To Long-Term Storage val rdd = KafkaUtils.createRDD[K, V](...) rdd.saveToCassandra("raw")
capture enrich index query Build Step 2: Enrich
Enrichment 1: User Attribution
Enrichment 1: User Attribution { "eventName": "page-visit", "url": "...?utm_medium=ad", "fingerprint": "abc", … } { "eventName": "subscribe", "userId": 123, "fingerprint": "abc", … }
Enrichment 1: User Attribution { "eventName": "page-visit", "url": "...?utm_medium=ad", "fingerprint": "abc", … } { "eventName": "subscribe", "userId": 123, "fingerprint": "abc", … } "attributedUserId": 123,
t Non-authenticated: userId = null Authenticated: userId = 123 fingerprint = abc (All events from a given browser) Enrichment 1: User Attribution
t Non-authenticated: userId = null attributedUserId = 123 Authenticated: userId = 123 fingerprint = abc (All events from a given browser) Enrichment 1: User Attribution
Authenticated: userId = 756 Enrichment 1: User Attribution tfingerprint = abc (All events from a given browser) Authenticated: userId = 123 Heuristics to attribute those
Authenticated: userId = 756 Enrichment 1: User Attribution tfingerprint = abc (All events from a given browser) Authenticated: userId = 123 Heuristics to attribute those
rdd.mapPartitions { iterator => val buffer = new ArrayBuffer() iterator .takeWhile(_.userId.isEmpty) .foreach(buffer.append) val userId = iterator.head.userId buffer.map(_.setAttributedUserId(userId)) ++ iterator } Enrichment 1: User Attribution
rdd.mapPartitions { iterator => val buffer = new ArrayBuffer() iterator .takeWhile(_.userId.isEmpty) .foreach(buffer.append) val userId = iterator.head.userId buffer.map(_.setAttributedUserId(userId)) ++ iterator } Enrichment 1: User Attribution
rdd.mapPartitions { iterator => val buffer = new ArrayBuffer() iterator .takeWhile(_.userId.isEmpty) .foreach(buffer.append) val userId = iterator.head.userId buffer.map(_.setAttributedUserId(userId)) ++ iterator } Enrichment 1: User Attribution
rdd.mapPartitions { iterator => val buffer = new ArrayBuffer() iterator .takeWhile(_.userId.isEmpty) .foreach(buffer.append) val userId = iterator.head.userId buffer.map(_.setAttributedUserId(userId)) ++ iterator } Enrichment 1: User Attribution
rdd.mapPartitions { iterator => val buffer = new ArrayBuffer() iterator .takeWhile(_.userId.isEmpty) .foreach(buffer.append) val userId = iterator.head.userId buffer.map(_.setAttributedUserId(userId)) ++ iterator } Enrichment 1: User Attribution Can grow big and OOM your worker for outliers who use Grammarly without ever registering
By default we should operate in User Memory (small fraction). Spark & Memory User Memory 100% - spark.memory.fraction = 25% Spark Memory spark.memory.fraction = 75% Let’s get into Spark Memory and use its safety features.
rdd.mapPartitions { iterator => val buffer = new SpillableBuffer() iterator .takeWhile(_.userId.isEmpty) .foreach(buffer.append) val userId = iterator.head.userId buffer.map(_.setAttributedUserId(userId)) ++ iterator } Enrichment 1: User Attribution Can safely grow in mem while enough free Spark Mem. Spills to disk otherwise.
Spark Memory Manager & Spillable Collection Memory Disk
Spark Memory Manager & Spillable Collection Memory Disk
Spark Memory Manager & Spillable Collection Memory Disk
Spark Memory Manager & Spillable Collection Memory ×2 Disk
Spark Memory Manager & Spillable Collection Memory Disk
Spark Memory Manager & Spillable Collection Memory Disk
Spark Memory Manager & Spillable Collection Memory Spill to Disk Disk
Spark Memory Manager & Spillable Collection Memory Disk Spill to Disk
Spark Memory Manager & Spillable Collection Memory Disk
trait SizeTracker { def afterUpdate(): Unit = { … } def estimateSize(): Long = { … } } Call on every append. Periodically estimates size and saves samples. Extrapolates SizeTracker
trait Spillable { abstract def spill(inMemCollection: C): Unit def maybeSpill(currentMemory: Long, inMemCollection: C) { try x2 if needed } } Spillable call on every append to collection
public long acquireExecutionMemory(long required, …) public void releaseExecutionMemory(long size, …) TaskMemoryManager
● Be safe with outliers ● Get outside User Memory (25%), use Spark Memory (75%) ● Spark APIs: Could be a bit friendlier and high level Custom Spillable Collection
Enrichment 2: Calculable Props
Enrichment Phase 2: Calculable Props { "eventName": "page-visit", "url": "...?utm_medium=ad", "fingerprint": "abc", "attributedUserId": 123, … } { "eventName": "subscribe", "userId": 123, "fingerprint": "abc", … }
Enrichment Phase 2: Calculable Props { "eventName": "page-visit", "url": "...?utm_medium=ad", "fingerprint": "abc", "attributedUserId": 123, … } { "eventName": "subscribe", "userId": 123, "fingerprint": "abc", "firstUtmMedium": "ad", … }
val firstUtmMedium: CalcProp[String] = (E "url").as[Url] .map(_.param("utm_source")) .forEvent("page-visit") .first Enrichment Phase 2: Calculable Props Engine & DSL
● Type-safe, functional, composable ● Familiar: similar to Scala collections API ● Batch & Stream (incremental) Enrichment Phase 2: Calculable Props Engine & DSL
Enrichment Pipeline with Spark
Raw Kafka Spark Pipeline Stream: Save Raw Kafka Stream: User Attr. User-attributed Kafka Stream: Calc Props Enriched and Queryable Batch: User Attr. Batch: Calc Props
batch micro-batchmicro-batchmicro-batch Cassandra Kafka Spark Pipeline Kafka Cassandra Kafka Parquet on AWS S3 batch
● Connectors for everything ● Great for batch ○ Shuffle with spilling ○ Failure recovery ● Great for streaming ○ Fast ○ Low overhead Spark Pipeline
batch micro-batchmicro-batchmicro-batch Cassandra Kafka Spark Pipeline Kafka Cassandra Kafka Parquet on AWS S3 batch
job Multiple Output Destinations Kafka Kafka CassandraCassandra
val rdd: RDD[T] rdd.sendToKafka(“topic_x”) rdd.saveToCassandra(“table_foo”) rdd.saveToCassandra(“table_bar”) Multiple Output Destinations: Try 1
rdd.saveToCassandra(...) rdd.forEachPartition(...) sc.runJob(...) Multiple Output Destinations: Try 1
job Multiple Output Destinations: Try 1 Kafka Kafka CassandraCassandra
job Multiple Output Destinations: Try 1 = 3 Jobs Kafka Kafka job Kafka Table 1 job Kafka Table 2
val rdd: RDD[T] rdd.cache() rdd.sendToKafka(“topic_x”) rdd.saveToCassandra(“table_foo”) rdd.saveToCassandra(“table_bar”) Multiple Output Destinations: Try 2
job Multiple Output Destinations: Try 2 = Read Once, 3 Jobs Kafka Kafka job Cache Table 1 job Cache Table 2
rdd.forEachPartition { iterator => val writer = new BufferedWriter( new OutputStreamWriter(new FileOutStream()) ) iterator.forEach { el => writer.writeln(el) } writer.close() // makes sure this writes } Writer
rdd.forEachPartition { iterator => val writer = new BufferedWriter( new OutputStreamWriter(new FileOutStream()) ) iterator.forEach { el => writer.writeln(el) } writer.close() // makes sure this writes } Writer
rdd.forEachPartition { iterator => val writer = new BufferedWriter( new OutputStreamWriter(new FileOutStream()) ) iterator.forEach { el => writer.writeln(el) } writer.close() // makes sure this writes } Writer
rdd.forEachPartition { iterator => val writer = new BufferedWriter( new OutputStreamWriter(new FileOutStream(...)) ) iterator.forEach { el => writer.writeln(el) } writer.close() // makes sure this writes } ● Buffer ● Non-blocking ● Idempotent / Dedupe Writer
andWriteToX = rdd.mapPartitions { iterator => val writer = new XWriter() val writingIterator = iterator.map { el => writer.write(el) }.closing(() => writer.close) } AndWriter
andWriteToX = rdd.mapPartitions { iterator => val writer = new XWriter() val writingIterator = iterator.map { el => writer.write(el) }.closing(() => writer.close) } AndWriter
andWriteToX = rdd.mapPartitions { iterator => val writer = new XWriter() val writingIterator = iterator.map { el => writer.write(el) }.closing(() => writer.close) } AndWriter
val rdd: RDD[T] rdd.andSaveToCassandra(“table_foo”) .andSaveToCassandra(“table_bar”) .sendToKafka(“topic_x”) Multiple Output Destinations: Try 3
job Multiple Output Destinations: Try 3 Kafka Kafka CassandraCassandra
● Kafka ● Cassandra ● HDFS Important! Each andWriter will consume resources ● Memory (buffers) ● IO And Writer
capture enrich index query Build Step 3: Index
Index ● Parquet on AWS S3 ● Custom partitioning: By eventName and time interval ● Append changes, compact + merge on the fly when querying ● Randomized names to maximize S3 parallelism ● Use s3a for max performance and and tweak for S3 read-after-write consistency ● Support flexible schema, even with conflicts!
Some Stats ● Thousands of events per second ● Terabytes of compressed data
capture enrich index query Build Step 4: Query
● DataFrames ● Spark SQL Scala dsl / Pure SQL ● Zeppelin Hardcore Query
● Plot by day ● Unique visitors ● Filter by country ● Split by traffic source (top 20) ● Time from 2 weeks ago to today Casual Query
Option 1: SQL Quickly gets complex
Too expensive to build, extend and support Option 2: UI
SEGMENT “eventName” WHERE foo = “bar” AND x.y IN (“a”, “b”, “c”) UNIQUE BY m IS NOT NULL TIME from 2 months ago to today STEP 1 month SPAN 1 week Option 3: Custom Query Language
SEGMENT “eventName” WHERE foo = “bar” AND x.y IN (“a”, “b”, “c”) UNIQUE BY m IS NOT NULL TIME from 2 months ago to today STEP 1 month SPAN 1 week Option 3: Custom Query Language Expressions
● Segment, Funnel, Retention ● UI & as DataFrame in Zeppelin ● Spark <= 1.6 – Scala Parser Combinators ● Reuse most complex part of expression parser ● Relatively extensible Option 3: Custom Query Language
Option 3: Custom Query Language
● Custom versatile analytics is doable and enjoyable ● Spark is a great platform to build analytics on top of ○ Enrichment Pipeline: Batch / Streaming, Query, ML ● Would be cool to see even deep internals slightly more extensible Conclusion
We are hiring! olivia@grammarly.com https://www.grammarly.com/jobs
Thank you! Questions?

Building a Versatile Analytics Pipeline on Top of Apache Spark with Mikhail Chernetsov