Scala Functional Message Processing: LambdaFlow John Nestor 47 Degrees www.47deg.com https://github.com/47deg/LambdaFlow May 9, 2017 147deg.com
47deg.com © Copyright 2017 47 Degrees Outline • Introduction • Examples • Simple • Windowing • Indexing and Query • Graph • Temperature • Page Rank • Questions 2
Introduction 3
47deg.com © Copyright 2017 47 Degrees Inspiration • Wanted to build a reactive system using Akka clustering • Don’t like Spark micro-batching, want message at a time • Want a Scala functional DSL, like the Spark Scala DSL 4
47deg.com © Copyright 2017 47 Degrees LambdaFlow • Near-real-time message processing • Message at a time • Both set (create or update) and delete • Written in Scala and Akka • Two parts: • Functional DSL • Akka Cluster reactive implementation 5
47deg.com © Copyright 2017 47 Degrees LambdaFlow Use Cases • * Near real time streaming with infinite stream • Finite streams • Batch processing • Input and outputs via • files • API • Kafka • … 6
47deg.com © Copyright 2017 47 Degrees Possible Use Case 7 LambdaFlow ... Kafka In 1 Kafka In 2 Kafka In M Kafka Out N Kafka Out 2 Kafka Out 1 ...
47deg.com © Copyright 2017 47 Degrees Functional DSL • Directed graph • Strongly typed • Graph nodes are actions • Graph arcs are paths for message passing • Graph can contain cycles 8
47deg.com © Copyright 2017 47 Degrees Key-Value Model • (most) Actions contain an in-memory Key-Value HashMap[K,V] • Two messages • Set/Update Key:k1 to Value:v1 • Delete Key-Value pair for Key:k1 9
47deg.com © Copyright 2017 47 Degrees Combining Actions • Most actions have • Input: used to set or delete keys in KV store • Output: used to set or delete keys on next action • Operation: maps KV of this action to KV of next action 10
47deg.com © Copyright 2017 47 Degrees Functional Eventual Consistency • Two Actions A1[K1,V1] and A2[K2,V2] • Each have a KV store: A1.kv[K1,V1] and A2.kv[K2,V2] • the output of A1 is the input to A2 • A1 has an operation A1.f that maps 
 [K1,V1] => Set([K2,V2]) • If all changes to A1 are stopped, eventually • A2.kv = A1.kv.flatMap(A1.f) 11
47deg.com © Copyright 2017 47 Degrees Incremental Operation • Suppose an action has k1->v1 and we now set k1->v2 • To update the next action • Undo the effect of k1->v1 • Do the effect of k1->v2 12
47deg.com © Copyright 2017 47 Degrees Incremental Operation Example • Suppose action KV type is String=>String • If we change “A”->”B” to “A”->”C” • Operation example 1 (map): (k,v)=>(k+v,v+v) • Delete “AB”->”BB” • Set “AC”=> “CC” • Operation example 2(mapvalue): (k,v) => (k,v+v) • Delete “A” -> “BB” (not needed here) • Set “A” -> “CC” 13
47deg.com © Copyright 2017 47 Degrees Implementations • Class based • Uses method calls rather than actor messages • Local Akka Actor based • Akka Cluster Based • Multinode • Reactive: scalable, responsive, fault-tolerant 14
Simple Example 15
47deg.com © Copyright 2017 47 Degrees Simple Example • Input data: • 8328 Tweets • Look at input data (capture.json) • Count number of tweets in each language • Use method calls between actions 16
47deg.com © Copyright 2017 47 Degrees Simple Example 17 (Lang,UID) => () Src Sink Reduce Sum Per Lang Lang => Count Map UID => Lang
47deg.com © Copyright 2017 47 Degrees Simple Example Code val src = Source[String, String]("src")
 val counts = Sink[String, Long]("counts")
 
 val f1 = src
 .map("map",(uid,lang)=>((uid,lang),()))
 .reduce("red", _._2, (k, v) => 1L, LongSum)
 .sink(counts)
 
 run(f1) 18
47deg.com © Copyright 2017 47 Degrees Source and Sink Actions • Source[K,V] • gets input from outside the graph (e.g. Kafka) • outputs to other actions • Sink[K,V] • gets it input from other actions • outputs to outside the graph (e.g. Kafka) 19
47deg.com © Copyright 2017 47 Degrees Map Operation • Action input type K,V • Action output type KOUT,VOUT • Action function type: (K,V)=>(KOUT,VOUT) 20
47deg.com © Copyright 2017 47 Degrees Reduce Operation • Action input type K,V • Action output type KOUT,VOUT • Grouping function type: K=>KOUT (many to one) • Value function type: (K,V)=>VOUT • Reduce operation type (VOUT,VOUT)=>VOUT • commutative, associative and inverse (abelian group) 21
47deg.com © Copyright 2017 47 Degrees Simple Example • Run it (demo.simple) • Look at code (Simple.scala) 22
Windowing Example 23
47deg.com © Copyright 2017 47 Degrees Windowing Example • Streaming Data • 1000 Tweets • sent in at ~20 per second • each tweet expires after 20 seconds • Count languages and find 20 most common words • Uses local actor implementation • Uses Vaadin based visualization tool 24
47deg.com © Copyright 2017 47 Degrees Windowing Example 25 Lang Count Reduce Txt Total Percent Words Join Map FlatMap MapReduce Reduce First Map
47deg.com © Copyright 2017 47 Degrees Windowing Example Code val f1 = src
 .reduce("red", _._1, (k, v) => 1L, LongSum)
 .to(counts1, counts2)
 .sink(counts)
 
 val f2 = counts1
 .reduce("red1", _ => List.empty[String], (k, v) => v, LongSum)
 .map[String, Long]("map1", (k, v) => ("total", v))
 .to(total1)
 .sink(total)
 
 val f3 = counts2
 .join[String, Long, Unit, String, String]("product", total1, (s:String)=>(), (s:String)=>()) {
 case (lang, count, t, total) => Set((lang, f"${count * 100 / total}%02d$PERCENT"))
 }
 .sink(percent)
 
 val f4 = text
 .flatMap("GetWords", (k, v) => {
 val (text, uuid) = k
 getWords(text)
 .zipWithIndex
 .map {
 case (word, i) => ((word, uuid, i), ())
 }
 .toSet
 })
 .map("CountOrder", (word, count) => ((-count, word), ()))
 .first("first", 20)
 .map("CountOrder1", (k, v) => (k._2, f"${-k._1}%4d"))
 .sink(words) run(f1, f2, f3, f4) 26
47deg.com © Copyright 2017 47 Degrees Labels,To and From • Label[K,V]: used to connect an action output to another action input • .to[K,V]: send one input to multiple outputs (broadcast) • .from[K,V]: combine multiple inputs to a single output (union) 27
47deg.com © Copyright 2017 47 Degrees FlatMap Action • Action input type K,V • Action output type KOUT,VOUT • Action function type: (K,V)=>Set[(KOUT,VOUT)] 28
47deg.com © Copyright 2017 47 Degrees First Action • Action input type K,V • Action output type K,V • Count: a small integer • Output at most count key-values • those with the lowest keys • requires type K be ordered 29
47deg.com © Copyright 2017 47 Degrees Join Action • Two inputs: K1,V1 and K2,V2 • Two key value stores • Output types KOUT and VOUT • Two mapping functions: K1=>K K2=>K (many to one) • An operation • Applied to each member of the product of KV pairs from the two key stores with the same K value • (K1,V1,K2,V2) => Set[(KOUT,VOUT)] 30
47deg.com © Copyright 2017 47 Degrees Viewer 31 LambdaFlow Remote Actor FlowView Vaadin Web BrowserTweets
47deg.com © Copyright 2017 47 Degrees Windowing Example • Start up Vaadin viewer and web browser • Run it (demo.TwitterFlow) • Look at code (TwitterFlow.scala) 32
Index and Query Example 33
47deg.com © Copyright 2017 47 Degrees Index and Query Example • Input data: Breweries • look at breweries.data • Build indexes • set of breweries at a location (State/Country) • set of breweries with a key word (in its description) • use jline3 command hander 34
47deg.com © Copyright 2017 47 Degrees Index Example 35 Breweries Reduce Names Places Words Map FlatMap Reduce
47deg.com © Copyright 2017 47 Degrees Query • words:Sink[String,Set[Int]] • words.get(word) => Future[Option[Set[Id]]] • breweries:Source[Int,String] : id->JsonString • def name(json:String)=>name (extracts the brewery name) • breweries.get(id,name) => Future[Option[String]] 36
Graph Example 37
47deg.com © Copyright 2017 47 Degrees Input Data:A Directed Graph 38 A B C D E F
47deg.com © Copyright 2017 47 Degrees Graph Example • Note: there are two graphs • The input data • The graph of actions • Output • Number of distinct paths between two nodes • Set of all cycles • Loop in action graph for transitive closure • Like recursive function • Functional semantics is fixed point of loop 39
47deg.com © Copyright 2017 47 Degrees Graph Transitive Closure • Given • route C -> E -> F • link A -> C • Form product (Common key = C) • route C -> E -> F (routes starting at C) • flipped link C -> A (links ending at C) • Result • route A -> C -> E -> F (goes back around loop!) 40
47deg.com © Copyright 2017 47 Degrees Graph Example 41 Edges Map toPath CyclesRoutes Join Link Map Route Map Invert Filter Cyles Reduce Count
47deg.com © Copyright 2017 47 Degrees Graph Example • Start Vaadin viewer and browser window • Run example (demo.GraphCycle) • showCycles • Look at code 42
47deg.com © Copyright 2017 47 Degrees Modified Graph 43 A B C D E F
47deg.com © Copyright 2017 47 Degrees Modify Graph • set F A • look at browser • showCycles • delete F A • look at browser • showCycles 44
Temperatures 45
47deg.com © Copyright 2017 47 Degrees Temperature Grid 46 T1 T2 T3 T4 0,0 1,0 2,0 T7 T5 0,1 1,1 2,1 T8 T6 0,2 1,2 2,2 T9 T10 T11 T12
47deg.com © Copyright 2017 47 Degrees Computation 47 U L (U+D+L+R) / 4 R D
47deg.com © Copyright 2017 47 Degrees Temp Example 48 Edges Temps Map FlatMap 1 => 4 Map Reduce 4 => 1 Map
47deg.com © Copyright 2017 47 Degrees Temperature Result 49 30 20 10 50 35 27 27 50 40 34 27 21 10 30 33 27 20 10 40 30 20
47deg.com © Copyright 2017 47 Degrees Temperature Change 50 30 20 10 50/10 35/24 27/25 27/28 50 40 34/32 27/29 21/27 10 30 33/34 27/33 20/40 10/80 40 30 20
Page Rank 51
47deg.com © Copyright 2017 47 Degrees Page Rank Data 52 A B C D E F
47deg.com © Copyright 2017 47 Degrees Page Rank Example 53 Edges Reduce Rank Join Invert Map Initial Val FlatMap Map Bias Reduce Join Div by Cnt Reduce Sum Map Reduce Select
47deg.com © Copyright 2017 47 Degrees Page Rank Result 54 1.4 .60 .88 .28 .75 .47
Questions 55
47deg.com © Copyright 2017 47 Degrees LambdaFlow and Kafka Common Properties • JVM based • Integrate with Kafka topics • Key-value based • One message at a time • Operations: map, reduce, … • Scalable and fault tolerant • Open source 56
47deg.com © Copyright 2017 47 Degrees LambdaFlow versus Kafka 57 LambdaFlow Kafka Akka Cluster Zookeeper Scala Java Set/Delete Set Functional Imperative

LambdaFlow: Scala Functional Message Processing

  • 1.
    Scala Functional Message Processing:LambdaFlow John Nestor 47 Degrees www.47deg.com https://github.com/47deg/LambdaFlow May 9, 2017 147deg.com
  • 2.
    47deg.com © Copyright2017 47 Degrees Outline • Introduction • Examples • Simple • Windowing • Indexing and Query • Graph • Temperature • Page Rank • Questions 2
  • 3.
  • 4.
    47deg.com © Copyright2017 47 Degrees Inspiration • Wanted to build a reactive system using Akka clustering • Don’t like Spark micro-batching, want message at a time • Want a Scala functional DSL, like the Spark Scala DSL 4
  • 5.
    47deg.com © Copyright2017 47 Degrees LambdaFlow • Near-real-time message processing • Message at a time • Both set (create or update) and delete • Written in Scala and Akka • Two parts: • Functional DSL • Akka Cluster reactive implementation 5
  • 6.
    47deg.com © Copyright2017 47 Degrees LambdaFlow Use Cases • * Near real time streaming with infinite stream • Finite streams • Batch processing • Input and outputs via • files • API • Kafka • … 6
  • 7.
    47deg.com © Copyright2017 47 Degrees Possible Use Case 7 LambdaFlow ... Kafka In 1 Kafka In 2 Kafka In M Kafka Out N Kafka Out 2 Kafka Out 1 ...
  • 8.
    47deg.com © Copyright2017 47 Degrees Functional DSL • Directed graph • Strongly typed • Graph nodes are actions • Graph arcs are paths for message passing • Graph can contain cycles 8
  • 9.
    47deg.com © Copyright2017 47 Degrees Key-Value Model • (most) Actions contain an in-memory Key-Value HashMap[K,V] • Two messages • Set/Update Key:k1 to Value:v1 • Delete Key-Value pair for Key:k1 9
  • 10.
    47deg.com © Copyright2017 47 Degrees Combining Actions • Most actions have • Input: used to set or delete keys in KV store • Output: used to set or delete keys on next action • Operation: maps KV of this action to KV of next action 10
  • 11.
    47deg.com © Copyright2017 47 Degrees Functional Eventual Consistency • Two Actions A1[K1,V1] and A2[K2,V2] • Each have a KV store: A1.kv[K1,V1] and A2.kv[K2,V2] • the output of A1 is the input to A2 • A1 has an operation A1.f that maps 
 [K1,V1] => Set([K2,V2]) • If all changes to A1 are stopped, eventually • A2.kv = A1.kv.flatMap(A1.f) 11
  • 12.
    47deg.com © Copyright2017 47 Degrees Incremental Operation • Suppose an action has k1->v1 and we now set k1->v2 • To update the next action • Undo the effect of k1->v1 • Do the effect of k1->v2 12
  • 13.
    47deg.com © Copyright2017 47 Degrees Incremental Operation Example • Suppose action KV type is String=>String • If we change “A”->”B” to “A”->”C” • Operation example 1 (map): (k,v)=>(k+v,v+v) • Delete “AB”->”BB” • Set “AC”=> “CC” • Operation example 2(mapvalue): (k,v) => (k,v+v) • Delete “A” -> “BB” (not needed here) • Set “A” -> “CC” 13
  • 14.
    47deg.com © Copyright2017 47 Degrees Implementations • Class based • Uses method calls rather than actor messages • Local Akka Actor based • Akka Cluster Based • Multinode • Reactive: scalable, responsive, fault-tolerant 14
  • 15.
  • 16.
    47deg.com © Copyright2017 47 Degrees Simple Example • Input data: • 8328 Tweets • Look at input data (capture.json) • Count number of tweets in each language • Use method calls between actions 16
  • 17.
    47deg.com © Copyright2017 47 Degrees Simple Example 17 (Lang,UID) => () Src Sink Reduce Sum Per Lang Lang => Count Map UID => Lang
  • 18.
    47deg.com © Copyright2017 47 Degrees Simple Example Code val src = Source[String, String]("src")
 val counts = Sink[String, Long]("counts")
 
 val f1 = src
 .map("map",(uid,lang)=>((uid,lang),()))
 .reduce("red", _._2, (k, v) => 1L, LongSum)
 .sink(counts)
 
 run(f1) 18
  • 19.
    47deg.com © Copyright2017 47 Degrees Source and Sink Actions • Source[K,V] • gets input from outside the graph (e.g. Kafka) • outputs to other actions • Sink[K,V] • gets it input from other actions • outputs to outside the graph (e.g. Kafka) 19
  • 20.
    47deg.com © Copyright2017 47 Degrees Map Operation • Action input type K,V • Action output type KOUT,VOUT • Action function type: (K,V)=>(KOUT,VOUT) 20
  • 21.
    47deg.com © Copyright2017 47 Degrees Reduce Operation • Action input type K,V • Action output type KOUT,VOUT • Grouping function type: K=>KOUT (many to one) • Value function type: (K,V)=>VOUT • Reduce operation type (VOUT,VOUT)=>VOUT • commutative, associative and inverse (abelian group) 21
  • 22.
    47deg.com © Copyright2017 47 Degrees Simple Example • Run it (demo.simple) • Look at code (Simple.scala) 22
  • 23.
  • 24.
    47deg.com © Copyright2017 47 Degrees Windowing Example • Streaming Data • 1000 Tweets • sent in at ~20 per second • each tweet expires after 20 seconds • Count languages and find 20 most common words • Uses local actor implementation • Uses Vaadin based visualization tool 24
  • 25.
    47deg.com © Copyright2017 47 Degrees Windowing Example 25 Lang Count Reduce Txt Total Percent Words Join Map FlatMap MapReduce Reduce First Map
  • 26.
    47deg.com © Copyright2017 47 Degrees Windowing Example Code val f1 = src
 .reduce("red", _._1, (k, v) => 1L, LongSum)
 .to(counts1, counts2)
 .sink(counts)
 
 val f2 = counts1
 .reduce("red1", _ => List.empty[String], (k, v) => v, LongSum)
 .map[String, Long]("map1", (k, v) => ("total", v))
 .to(total1)
 .sink(total)
 
 val f3 = counts2
 .join[String, Long, Unit, String, String]("product", total1, (s:String)=>(), (s:String)=>()) {
 case (lang, count, t, total) => Set((lang, f"${count * 100 / total}%02d$PERCENT"))
 }
 .sink(percent)
 
 val f4 = text
 .flatMap("GetWords", (k, v) => {
 val (text, uuid) = k
 getWords(text)
 .zipWithIndex
 .map {
 case (word, i) => ((word, uuid, i), ())
 }
 .toSet
 })
 .map("CountOrder", (word, count) => ((-count, word), ()))
 .first("first", 20)
 .map("CountOrder1", (k, v) => (k._2, f"${-k._1}%4d"))
 .sink(words) run(f1, f2, f3, f4) 26
  • 27.
    47deg.com © Copyright2017 47 Degrees Labels,To and From • Label[K,V]: used to connect an action output to another action input • .to[K,V]: send one input to multiple outputs (broadcast) • .from[K,V]: combine multiple inputs to a single output (union) 27
  • 28.
    47deg.com © Copyright2017 47 Degrees FlatMap Action • Action input type K,V • Action output type KOUT,VOUT • Action function type: (K,V)=>Set[(KOUT,VOUT)] 28
  • 29.
    47deg.com © Copyright2017 47 Degrees First Action • Action input type K,V • Action output type K,V • Count: a small integer • Output at most count key-values • those with the lowest keys • requires type K be ordered 29
  • 30.
    47deg.com © Copyright2017 47 Degrees Join Action • Two inputs: K1,V1 and K2,V2 • Two key value stores • Output types KOUT and VOUT • Two mapping functions: K1=>K K2=>K (many to one) • An operation • Applied to each member of the product of KV pairs from the two key stores with the same K value • (K1,V1,K2,V2) => Set[(KOUT,VOUT)] 30
  • 31.
    47deg.com © Copyright2017 47 Degrees Viewer 31 LambdaFlow Remote Actor FlowView Vaadin Web BrowserTweets
  • 32.
    47deg.com © Copyright2017 47 Degrees Windowing Example • Start up Vaadin viewer and web browser • Run it (demo.TwitterFlow) • Look at code (TwitterFlow.scala) 32
  • 33.
  • 34.
    47deg.com © Copyright2017 47 Degrees Index and Query Example • Input data: Breweries • look at breweries.data • Build indexes • set of breweries at a location (State/Country) • set of breweries with a key word (in its description) • use jline3 command hander 34
  • 35.
    47deg.com © Copyright2017 47 Degrees Index Example 35 Breweries Reduce Names Places Words Map FlatMap Reduce
  • 36.
    47deg.com © Copyright2017 47 Degrees Query • words:Sink[String,Set[Int]] • words.get(word) => Future[Option[Set[Id]]] • breweries:Source[Int,String] : id->JsonString • def name(json:String)=>name (extracts the brewery name) • breweries.get(id,name) => Future[Option[String]] 36
  • 37.
  • 38.
    47deg.com © Copyright2017 47 Degrees Input Data:A Directed Graph 38 A B C D E F
  • 39.
    47deg.com © Copyright2017 47 Degrees Graph Example • Note: there are two graphs • The input data • The graph of actions • Output • Number of distinct paths between two nodes • Set of all cycles • Loop in action graph for transitive closure • Like recursive function • Functional semantics is fixed point of loop 39
  • 40.
    47deg.com © Copyright2017 47 Degrees Graph Transitive Closure • Given • route C -> E -> F • link A -> C • Form product (Common key = C) • route C -> E -> F (routes starting at C) • flipped link C -> A (links ending at C) • Result • route A -> C -> E -> F (goes back around loop!) 40
  • 41.
    47deg.com © Copyright2017 47 Degrees Graph Example 41 Edges Map toPath CyclesRoutes Join Link Map Route Map Invert Filter Cyles Reduce Count
  • 42.
    47deg.com © Copyright2017 47 Degrees Graph Example • Start Vaadin viewer and browser window • Run example (demo.GraphCycle) • showCycles • Look at code 42
  • 43.
    47deg.com © Copyright2017 47 Degrees Modified Graph 43 A B C D E F
  • 44.
    47deg.com © Copyright2017 47 Degrees Modify Graph • set F A • look at browser • showCycles • delete F A • look at browser • showCycles 44
  • 45.
  • 46.
    47deg.com © Copyright2017 47 Degrees Temperature Grid 46 T1 T2 T3 T4 0,0 1,0 2,0 T7 T5 0,1 1,1 2,1 T8 T6 0,2 1,2 2,2 T9 T10 T11 T12
  • 47.
    47deg.com © Copyright2017 47 Degrees Computation 47 U L (U+D+L+R) / 4 R D
  • 48.
    47deg.com © Copyright2017 47 Degrees Temp Example 48 Edges Temps Map FlatMap 1 => 4 Map Reduce 4 => 1 Map
  • 49.
    47deg.com © Copyright2017 47 Degrees Temperature Result 49 30 20 10 50 35 27 27 50 40 34 27 21 10 30 33 27 20 10 40 30 20
  • 50.
    47deg.com © Copyright2017 47 Degrees Temperature Change 50 30 20 10 50/10 35/24 27/25 27/28 50 40 34/32 27/29 21/27 10 30 33/34 27/33 20/40 10/80 40 30 20
  • 51.
  • 52.
    47deg.com © Copyright2017 47 Degrees Page Rank Data 52 A B C D E F
  • 53.
    47deg.com © Copyright2017 47 Degrees Page Rank Example 53 Edges Reduce Rank Join Invert Map Initial Val FlatMap Map Bias Reduce Join Div by Cnt Reduce Sum Map Reduce Select
  • 54.
    47deg.com © Copyright2017 47 Degrees Page Rank Result 54 1.4 .60 .88 .28 .75 .47
  • 55.
  • 56.
    47deg.com © Copyright2017 47 Degrees LambdaFlow and Kafka Common Properties • JVM based • Integrate with Kafka topics • Key-value based • One message at a time • Operations: map, reduce, … • Scalable and fault tolerant • Open source 56
  • 57.
    47deg.com © Copyright2017 47 Degrees LambdaFlow versus Kafka 57 LambdaFlow Kafka Akka Cluster Zookeeper Scala Java Set/Delete Set Functional Imperative