Stephan Ewen Flink committer co-founder / CTO @ data Artisans @StephanEwen Apache Flink
Looking back one year 2
April 16, 2014 3
Stratosphere 0.4 4 Stratosphere Optimizer Pact API (Java) Stratosphere Runtime DataSet API (Scala) Local Remote Batch processing on a pipelining engine, with iterations …
Looking at now… 5
Flink Historic data Kafka, RabbitMQ, ... HDFS, JDBC, ... ETL, Graphs, Machine Learning Relational, … Low latency, windowing, aggregations, ... Event logs Real-time data streams What is Apache Flink? (master)
What is Apache Flink? 7 Python Gelly Table ML SAMOA Flink Optimizer DataSet (Java/Scala) DataStream (Java/Scala) Stream Builder Hadoop M/R Local Remote Yarn Tez Embedded Dataflow Dataflow Flink Dataflow Runtime HDFS HBase Kafka RabbitMQ Flume HCatalog JDBC
Batch / Steaming APIs 8 case class Word (word: String, frequency: Int) val lines: DataStream[String] = env.fromSocketStream(...) lines.flatMap {line => line.split(" ") .map(word => Word(word,1))} .window(Count.of(1000)).every(Count.of(100)) .groupBy("word").sum("frequency") .print() val lines: DataSet[String] = env.readTextFile(...) lines.flatMap {line => line.split(" ") .map(word => Word(word,1))} .groupBy("word").sum("frequency") .print() DataSet API (batch): DataStream API (streaming):
Technology inside Flink case class Path (from: Long, to: Long) val tc = edges.iterate(10) { paths: DataSet[Path] => val next = paths .join(edges) .where("to") .equalTo("from") { (path, edge) => Path(path.from, edge.to) } .union(paths) .distinct() next } Cost-based optimizer Type extraction stack Task scheduling Recovery metadata Pre-flight (Client) Master Workers DataSourc e orders.tbl Filter Map DataSourc e lineitem.tbl Join Hybrid Hash build HT probe hash-part [0] hash-part [0] GroupRed sort forward Program Dataflow Graph Memory manager Out-of-core algos Batch & Streaming State & Checkpoints deploy operators track intermediate results
Flink by Feature / Use Case 10
Data Streaming Analysis 11
Life of data streams  Create: create streams from event sources (machines, databases, logs, sensors, …)  Collect: collect and make streams available for consumption (e.g., Apache Kafka)  Process: process streams, possibly generating derived streams (e.g., Apache Flink) 12
Stream Analysis in Flink 13 More at: http://flink.apache.org/news/2015/02/09/streaming-example.html
Defining windows in Flink  Trigger policy • When to trigger the computation on current window  Eviction policy • When data points should leave the window • Defines window width/size  E.g., count-based policy • evict when #elements > n • start a new window every n-th element  Built-in: Count, Time, Delta policies 14
Checkpointing / Recovery  Flink acknowledges batches of records • Less overhead in failure-free case • Currently tied to fault tolerant data sources (e.g., Kafka)  Flink operators can keep state • State is checkpointed • Checkpointing and record acks go together  Exactly one semantics for state 15
Checkpointing / Recovery 16 Chandy-Lamport Algorithm for consistent asynchronous distributed snapshots Pushes checkpoint barriers through the data flow Operator checkpoint starting Checkpoint done Data Stream barrier Before barrier = part of the snapshot After barrier = Not in snapshot Checkpoint done checkpoint in progress (backup till next snapshot)
Heavy ETL Pipelines 17
Heavy Data Pipelines 18 Complex ETL programs Apology: Graph had to be blurred for online slides, due to confidentiality
Memory Management public class WC { public String word; public int count; } empty page Pool of Memory Pages Sorting, hashing, caching Shuffling, broadcasts User code objects ManagedUnmanaged 19 Flink contains its own memory management stack. Memory is allocated, de-allocated, and used strictly using an internal buffer pool implementation. To do that, Flink contains its own type extraction and serialization components. More at: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
Smooth out-of-core performance 20 More at: http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html Single-core join of 1KB Java objects beyond memory (4 GB) Blue bars are in-memory, orange bars (partially) out-of-core
Benefits of managed memory  More reliable and stable performance (less GC effects, easy to go to disk) 21
Table API 22 val customers = envreadCsvFile(…).as('id, 'mktSegment) .filter( 'mktSegment === "AUTOMOBILE" ) val orders = env.readCsvFile(…) .filter( o => dateFormat.parse(o.orderDate).before(date) ) .as('orderId, 'custId, 'orderDate, 'shipPrio) val items = orders .join(customers).where('custId === 'id) .join(lineitems).where('orderId === 'id) .select('orderId,'orderDate,'shipPrio, 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue) val result = items .groupBy('orderId, 'orderDate, 'shipPrio) .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
Iterations in Data Flows  Machine Learning Algorithms 23
Iterate by looping  for/while loop in client submits one job per iteration step  Data reuse by caching in memory and/or disk Step Step Step Step Step Client 24
Iterate in the Dataflow 25 partial solution partial solutionX other datasets Y initial solution iteration result Replace Step function
Large-Scale Machine Learning 26 Factorizing a matrix with 28 billion ratings for recommendations (Scale of Netflix or Spotify) More at: http://data-artisans.com/computing-recommendations-with-flink.html
State in Iterations  Graphs and Machine Learning 27
Iterate natively with deltas 28 partial solution delta setX other datasets Y initial solution iteration result workset A B workset Merge deltas Replace initial workset
Effect of delta iterations… 0 5000000 10000000 15000000 20000000 25000000 30000000 35000000 40000000 45000000 1 6 11 16 21 26 31 36 41 46 51 56 61 #ofelementsupdated iteration
… very fast graph analysis 30 … and mix and match ETL-style and graph analysis in one program Performance competitive with dedicated graph analysis systems More at: http://data-artisans.com/data-analysis-with-flink.html
Closing 31
Flink Roadmap for 2015  Out-of-core state in Streaming  Monitoring and scaling for streaming  Streaming Machine Learning with SAMOA  More additions to the libraries • Batch Machine Learning • Graph library additions (more algorithms)  SQL on top of expression language  Master failover 32
Flink community 0 20 40 60 80 100 120 Aug-10 Feb-11 Sep-11 Apr-12 Oct-12 May-13 Nov-13 Jun-14 Dec-14 Jul-15 #unique contributor ids by git commits
flink.apache.org @ApacheFlink
35 Backup
Cornerpoints of Flink Design 36 Robust Algorithms on Managed Memory Pipelined Execution of Batch Programs  Better shuffle performance  No OutOfMemory Errors  Scales to very large JVMs  Efficient an robust processing Flexible Data Streaming Engine  Low Latency Steam Proc.  Highly flexible windows Native Iterations  Very fast Graph Processing  Stateful Iterations for ML High-level APIs, beyond key/value pairs  Java/Scala/Python (upcoming)  Relational-style optimizer  Graphs / Machine Learning  Streaming ML (coming)  Scales to very large groups Active Library Development
Program optimization 37
A simple program 38 val orders = … val lineitems = … val filteredOrders = orders .filter(o => dataFormat.parse(l.shipDate).after(date)) .filter(o => o.shipPrio > 2) val lineitemsOfOrders = filteredOrders .join(lineitems) .where(“orderId”).equalTo(“orderId”) .apply((o,l) => new SelectedItem(o.orderDate, l.extdPrice)) val priceSums = lineitemsOfOrders .groupBy(“orderDate”).sum(“l.extdPrice”);
Two execution plans 39 DataSource orders.tbl Filter Map DataSource lineitem.tbl Join Hybrid Hash buildHT probe broadcast forward Combine GroupRed sort DataSource orders.tbl Filter Map DataSource lineitem.tbl Join Hybrid Hash buildHT probe hash-part [0] hash-part [0] hash-part [0,1] GroupRed sort forwardBest plan depends on relative sizes of input files
Examples of optimization  Task chaining • Coalesce map/filter/etc tasks  Join optimizations • Broadcast/partition, build/probe side, hash or sort- merge  Interesting properties • Re-use partitioning and sorting for later operations  Automatic caching • E.g., for iterations 40
Visualization 41
Visualization tools 42
Visualization tools 43
Visualization tools 44

Apache Flink - Overview and Use cases of a Distributed Dataflow System (at pre-Hadoop Summit Meetups)

  • 1.
    Stephan Ewen Flink committer co-founder/ CTO @ data Artisans @StephanEwen Apache Flink
  • 2.
  • 3.
  • 4.
    Stratosphere 0.4 4 Stratosphere Optimizer PactAPI (Java) Stratosphere Runtime DataSet API (Scala) Local Remote Batch processing on a pipelining engine, with iterations …
  • 5.
  • 6.
    Flink Historic data Kafka, RabbitMQ,... HDFS, JDBC, ... ETL, Graphs, Machine Learning Relational, … Low latency, windowing, aggregations, ... Event logs Real-time data streams What is Apache Flink? (master)
  • 7.
    What is ApacheFlink? 7 Python Gelly Table ML SAMOA Flink Optimizer DataSet (Java/Scala) DataStream (Java/Scala) Stream Builder Hadoop M/R Local Remote Yarn Tez Embedded Dataflow Dataflow Flink Dataflow Runtime HDFS HBase Kafka RabbitMQ Flume HCatalog JDBC
  • 8.
    Batch / SteamingAPIs 8 case class Word (word: String, frequency: Int) val lines: DataStream[String] = env.fromSocketStream(...) lines.flatMap {line => line.split(" ") .map(word => Word(word,1))} .window(Count.of(1000)).every(Count.of(100)) .groupBy("word").sum("frequency") .print() val lines: DataSet[String] = env.readTextFile(...) lines.flatMap {line => line.split(" ") .map(word => Word(word,1))} .groupBy("word").sum("frequency") .print() DataSet API (batch): DataStream API (streaming):
  • 9.
    Technology inside Flink caseclass Path (from: Long, to: Long) val tc = edges.iterate(10) { paths: DataSet[Path] => val next = paths .join(edges) .where("to") .equalTo("from") { (path, edge) => Path(path.from, edge.to) } .union(paths) .distinct() next } Cost-based optimizer Type extraction stack Task scheduling Recovery metadata Pre-flight (Client) Master Workers DataSourc e orders.tbl Filter Map DataSourc e lineitem.tbl Join Hybrid Hash build HT probe hash-part [0] hash-part [0] GroupRed sort forward Program Dataflow Graph Memory manager Out-of-core algos Batch & Streaming State & Checkpoints deploy operators track intermediate results
  • 10.
    Flink by Feature/ Use Case 10
  • 11.
  • 12.
    Life of datastreams  Create: create streams from event sources (machines, databases, logs, sensors, …)  Collect: collect and make streams available for consumption (e.g., Apache Kafka)  Process: process streams, possibly generating derived streams (e.g., Apache Flink) 12
  • 13.
    Stream Analysis inFlink 13 More at: http://flink.apache.org/news/2015/02/09/streaming-example.html
  • 14.
    Defining windows inFlink  Trigger policy • When to trigger the computation on current window  Eviction policy • When data points should leave the window • Defines window width/size  E.g., count-based policy • evict when #elements > n • start a new window every n-th element  Built-in: Count, Time, Delta policies 14
  • 15.
    Checkpointing / Recovery Flink acknowledges batches of records • Less overhead in failure-free case • Currently tied to fault tolerant data sources (e.g., Kafka)  Flink operators can keep state • State is checkpointed • Checkpointing and record acks go together  Exactly one semantics for state 15
  • 16.
    Checkpointing / Recovery 16 Chandy-LamportAlgorithm for consistent asynchronous distributed snapshots Pushes checkpoint barriers through the data flow Operator checkpoint starting Checkpoint done Data Stream barrier Before barrier = part of the snapshot After barrier = Not in snapshot Checkpoint done checkpoint in progress (backup till next snapshot)
  • 17.
  • 18.
    Heavy Data Pipelines 18 ComplexETL programs Apology: Graph had to be blurred for online slides, due to confidentiality
  • 19.
    Memory Management public classWC { public String word; public int count; } empty page Pool of Memory Pages Sorting, hashing, caching Shuffling, broadcasts User code objects ManagedUnmanaged 19 Flink contains its own memory management stack. Memory is allocated, de-allocated, and used strictly using an internal buffer pool implementation. To do that, Flink contains its own type extraction and serialization components. More at: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
  • 20.
    Smooth out-of-core performance 20 Moreat: http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html Single-core join of 1KB Java objects beyond memory (4 GB) Blue bars are in-memory, orange bars (partially) out-of-core
  • 21.
    Benefits of managedmemory  More reliable and stable performance (less GC effects, easy to go to disk) 21
  • 22.
    Table API 22 val customers= envreadCsvFile(…).as('id, 'mktSegment) .filter( 'mktSegment === "AUTOMOBILE" ) val orders = env.readCsvFile(…) .filter( o => dateFormat.parse(o.orderDate).before(date) ) .as('orderId, 'custId, 'orderDate, 'shipPrio) val items = orders .join(customers).where('custId === 'id) .join(lineitems).where('orderId === 'id) .select('orderId,'orderDate,'shipPrio, 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue) val result = items .groupBy('orderId, 'orderDate, 'shipPrio) .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
  • 23.
    Iterations in DataFlows  Machine Learning Algorithms 23
  • 24.
    Iterate by looping for/while loop in client submits one job per iteration step  Data reuse by caching in memory and/or disk Step Step Step Step Step Client 24
  • 25.
    Iterate in theDataflow 25 partial solution partial solutionX other datasets Y initial solution iteration result Replace Step function
  • 26.
    Large-Scale Machine Learning 26 Factorizinga matrix with 28 billion ratings for recommendations (Scale of Netflix or Spotify) More at: http://data-artisans.com/computing-recommendations-with-flink.html
  • 27.
    State in Iterations Graphs and Machine Learning 27
  • 28.
    Iterate natively withdeltas 28 partial solution delta setX other datasets Y initial solution iteration result workset A B workset Merge deltas Replace initial workset
  • 29.
    Effect of deltaiterations… 0 5000000 10000000 15000000 20000000 25000000 30000000 35000000 40000000 45000000 1 6 11 16 21 26 31 36 41 46 51 56 61 #ofelementsupdated iteration
  • 30.
    … very fastgraph analysis 30 … and mix and match ETL-style and graph analysis in one program Performance competitive with dedicated graph analysis systems More at: http://data-artisans.com/data-analysis-with-flink.html
  • 31.
  • 32.
    Flink Roadmap for2015  Out-of-core state in Streaming  Monitoring and scaling for streaming  Streaming Machine Learning with SAMOA  More additions to the libraries • Batch Machine Learning • Graph library additions (more algorithms)  SQL on top of expression language  Master failover 32
  • 33.
    Flink community 0 20 40 60 80 100 120 Aug-10 Feb-11Sep-11 Apr-12 Oct-12 May-13 Nov-13 Jun-14 Dec-14 Jul-15 #unique contributor ids by git commits
  • 34.
  • 35.
  • 36.
    Cornerpoints of FlinkDesign 36 Robust Algorithms on Managed Memory Pipelined Execution of Batch Programs  Better shuffle performance  No OutOfMemory Errors  Scales to very large JVMs  Efficient an robust processing Flexible Data Streaming Engine  Low Latency Steam Proc.  Highly flexible windows Native Iterations  Very fast Graph Processing  Stateful Iterations for ML High-level APIs, beyond key/value pairs  Java/Scala/Python (upcoming)  Relational-style optimizer  Graphs / Machine Learning  Streaming ML (coming)  Scales to very large groups Active Library Development
  • 37.
  • 38.
    A simple program 38 valorders = … val lineitems = … val filteredOrders = orders .filter(o => dataFormat.parse(l.shipDate).after(date)) .filter(o => o.shipPrio > 2) val lineitemsOfOrders = filteredOrders .join(lineitems) .where(“orderId”).equalTo(“orderId”) .apply((o,l) => new SelectedItem(o.orderDate, l.extdPrice)) val priceSums = lineitemsOfOrders .groupBy(“orderDate”).sum(“l.extdPrice”);
  • 39.
    Two execution plans 39 DataSource orders.tbl Filter MapDataSource lineitem.tbl Join Hybrid Hash buildHT probe broadcast forward Combine GroupRed sort DataSource orders.tbl Filter Map DataSource lineitem.tbl Join Hybrid Hash buildHT probe hash-part [0] hash-part [0] hash-part [0,1] GroupRed sort forwardBest plan depends on relative sizes of input files
  • 40.
    Examples of optimization Task chaining • Coalesce map/filter/etc tasks  Join optimizations • Broadcast/partition, build/probe side, hash or sort- merge  Interesting properties • Re-use partitioning and sorting for later operations  Automatic caching • E.g., for iterations 40
  • 41.
  • 42.
  • 43.
  • 44.

Editor's Notes

  • #5 Iterations, Yarn support, Local execution, accummulators, web frontend, HBase, JDBC, Windows compatibility, mvn central,
  • #34 dev list: 300-400 messages/month. record 1000 messages on