1
ML is simple Data Magic Happiness 2
May be not 3
The reality Set business goals Understand your data Create hypothesis Define experiments Prepare
 data Measure/
 evaluate results Score
 models Export
 models Verify/test models Train/tune models 4 Our Topic
What is the model? A model is a function transforming inputs to outputs - y = f(x), for example: Linear regression: y = ac + a1*x1 + … + an*xn Neural network: f (x) = K ( ∑i wi gi (x))  Such a definition of the model allows for an easy implementation of model’s composition. From the implementation point of view it is just function composition 5
Model learning pipeline UC Berkeley AMPLab introduced machine learning pipelines as a graph defining the 
 complete chain of data transformation. Input Data Stream Data Preprocessing Predictive
 Model Data
 Postprocessing Results model
 outputs model
 inputs model learning pipeline 6
QuestionnaireTraditional approach to model serving • Model is code • This code has to be saved and then somehow imported into model serving
 Why is this problematic? 7
 Impedance mismatch Continually expanding 
 Data Scientist toolbox Defined Software
 Engineer toolbox 8
Alternative - Model as data Export Export Data Model
 Evaluator Results Model
 Document Portable Format for Analytics (PFA) 9 Standards
Exporting Model as Data with PMML There are already a lot of export options https://github.com/jpmml/jpmml-sparkml https://github.com/jpmml/jpmml-sklearn https://github.com/jpmml/jpmml-r https://github.com/jpmml/jpmml-tensorflow 10
Evaluating PMML Model There are also a couple PMML evaluators https://github.com/jpmml/jpmml-evaluator https://github.com/opendatagroup/augustus 11
Exporting Model as Data with Tensorflow • Tensorflow execution is based on Tensors and Graphs • Tensors are defined as multilinear functions which consists of various vector variables • A computational graph is a series of Tensorflow operations arranged into graph of nodes. • Tensorflow support exporting of such graph in the form of binary protocol buffers. • There are two different export format - optimized graph and a new format - saved model 12
Evaluating Tensorflow Model • Tensorflow is implemented in C++ with Python interface. • In order to simplify Tensorflow usage from Java, in 2017 Google introduced Tensorflow Java APIs. • Tensorflow Java APIs supports import of the exported model and use them for scoring. 13
Additional Considerations – Model lifecycle • Models tend to change • Update frequencies vary greatly – 
 from hourly to quarterly/yearly • Model version tracking • Model release practices • Model update process 14
Traditional model serving implementation 15 Models are deployed separately from stream processing: • Tensorflow serving • Clipper • Model Server Apache MXNet • DeepDetect • TensorRT
Log driven enterprise 16 • Complete decoupling of services interactions • All communications are going through the log rather then services talking to each other • Stream processors do not need to explicitly talk to the services, all updates are directly available in the log
Model serving in a log driven enterprise A streaming system allowing to update models without interruption of execution (dynamically controlled stream). Machine
 learning Data
 source Model
 source Data stream Model stream Model update Streaming engine Current model Additional
 processing Result External model 
 storage (Optional) 17
Model representation On the wire syntax = “proto3”; // Description of the trained model. message ModelDescriptor {    // Model name    string name = 1;    // Human readable description.    string description = 2;    // Data type for which this model is applied.    string dataType = 3;    // Model type    enum ModelType {        TENSORFLOW  = 0;        TENSORFLOWSAVED = 2;        PMML = 2;    };    ModelType modeltype = 4;    oneof MessageContent {        // Byte array containing the model        bytes data = 5;        string location = 6;    } } Internal
 trait Model {  def score(input : AnyVal) : AnyVal  def cleanup() : Unit  def toBytes() : Array[Byte]  def getType : Long }
 trait ModelFactoryl {  def create(input : ModelDescriptor) : Model  def restore(bytes : Array[Byte]) : Model } 18
Additional considerations monitoring case class ModelToServeStats( name: String, // Model name description: String, // Model descriptor modelType: ModelDescriptor.ModelType, // Model type since : Long, // Start time of model usage var usage : Long = 0, // Number of servings var duration : Double = .0, // Time spent on serving        var min : Long = Long.MaxValue, // Min serving time var max : Long = Long.MinValue // Max serving time ) Model monitoring should provide information about usage, behavior, performance and lifecycle of the deployed models 19
Queryable state Queryable state (interactive queries) is an approach, which  allows to get more from streaming than just the processing of data. This feature allows to treat the stream processing layer as a lightweight embedded database and, more concretely, to directly query the current state of a stream processing application, without needing to materialize that state to external databases or external storage first. Stream
 source State Stream processor Monitoring Interactive queries Streaming engine Other app 20
Implementation options Modern stream-processing engines (SPE) take advantage of the cluster architectures. They organize computations into a set of operators, which enables execution parallelism; different operators can run on different threads or different machines. Stream-processing library (SPL), on the other hand, is a library, and often domain-specific language (DSL), of constructs simplifying building streaming applications. 21
Decision criteria • Using an SPE is a good fit for applications that require features provided out of the box by such engines. But you need to adhere to its programming and deployment models. • A SPLs provide a programming model that allows developers to build the applications or micro services the way that fits their precise needs and deploy them as simple standalone Java applications. 22
• Akka Streams, part of the Akka project, is a library focused on in process back-pressured reactive streaming. • Provides a broad ecosystem of connectors to various technologies (data stores, message queues, file stores, streaming services, etc) - Alpakka • In Akka Streams computations are written in graph-resembling domain-specific lanauge (DSL), which aims to make translating graph drawings to and from code simpler. 23
Using custom stage Create custom stage, which is a fully type-safe way to encapsulate required functionality. ur stage will provide functionality somewhat similar to a Flink low-level join Source 1
 Alpakka Flow 1 Source 2
 Alpakka Flow 2 Custom stage – 
 model serving Stream 1 Stream 2 Stream 1 Stream 2 Results Stream 24
Improve scalability Using the router actor to forward request to an individual actor responsible for processing request for a specific model type low-level join Source 1
 Alpakka Flow 1 Source 2
 Alpakka Flow 2 Stream 1 Stream 2 Model serving router Stream 1 Stream 2 Model serving 
 actor Model serving 
 actorModel serving 
 actor Model serving 
 actor 25
Flink is an open source stream-processing engine (SPE) that provides the following: • Scales well, running on thousands of nodes. • Provides powerful checkpointing and save pointing facilities that enable fault tolerance and restart ability. • Provides state support for streaming applications, which allows minimization of usage of external databases for streaming applications. • Provides powerful window semantics, allowing you to produce accurate results, even in the case of out-of-order or late-arriving data 26
Flink Low Level Join • Create a state object for one input (or both) • Update the state upon receiving elements from its input • Upon receiving elements from the other input, probe the state and produce the joined result Source 1 Source 2 Process stream 1 record Process stream 2 record Current state Result stream processing Stream 1 Stream 2 Join component Results stream 27
Key based join Flink’s CoProcessFunction allows key-based merge of 2 streams. When using this API, data is key-partitioned across multiple Flink executors. Records from both streams are routed (based on key) to the appropriate executor that is responsible for the actual processing. Key group 1 Key group 2 Key group 3 • • • Key group n Model stream Model server 
 (key group 1) Model server
 (key group 2) Model server 
 (key group 3) Model server 
 (key group n) Servingresults • • • Key group 1 Key group 2 Key group 3 • • • Key group n Data stream 28
Partition based join Flink’s RichCoFlatMapFunction allows merging of 2 streams in parallel (based on parralelization parameter). When using this API, on the partitioned stream, data from different partitions is processed by dedicated Flink executor. Model stream Model server 
 (instance 1) Model server 
 (instance 2) Model server 
 (instance 3) Model server 
 (instance n) Servingresults • • • Partition 1 Partition 2 Partition 3 Data stream 29
Additional architectural concerns for model serving 30 • Model tracking • Speculative model execution
Model tracking - Motivation 31 • You update your model periodically • You score a particular record R with model version N • Later, you audit the data and wonder why R was scored the way it was • You can’t answer the question unless you know which model version was actually used for R
Model tracking 32 • Need to remember models - a model repository • Basic info for the model: • Name • Version (or other unique ID) • Creation date • Quality metric • Definition • …
Model tracking 33 • You also need to augment the output records with the model ID, as well as the score. • Input Record • Output Record with Score, model version ID … … … … … … … … IDscore
Speculative execution 34 According to Wikipedia speculative execution is: • an optimization technique • The system performs work that may not be needed, before it’s known if it will be needed • So, if and when we discover it IS needed, we don’t have to wait • Or, results are discarded if not needed. •
General Architecture for speculative execution 35 • Starter (proxy) controlling parallelism and invocation strategy. • Parallel execution by multiple executors • Collector responsible for bringing results from multiple executors together •
Speculative execution 36 • Provides more concurrency if extra resources are available. • Used for: • branch prediction in pipelined processors, • value prediction for exploiting value locality, • prefetching memory and files, • etc. • • Why not use it with machine learning??
Applicability for model serving 37 • Used to guarantee execution time • Several models: • A smart model, but takes time T1 • A “less smart”, but fast model with a fixed upper-limit on execution time, T2 << T1 • If timeout (T > T2) occurs before smart finishes, return the less accurate result • …
Applicability for model serving 38 • … • Consensus based model serving • If we have 3 or more models, score with all of them and return the majority result • …
Applicability for model serving 39 • … • Quality based model serving. • If we have a quality metric, pick the result with the best result. • Of course, you can combine these techniques.
Speculative Model Serving Architecture 40
Akka Streams 41
Flink Implementation 42 • Router processor implements starter • Model processor • Speculative processor implements collector •
Want to learn more ? Serving Machine Learning Models A Guide to Architecture, Stream Processing Engines, 
 and Frameworks By Boris Lublinksy GET YOUR FREE COPY 43
Thank You Any questions?

Machine Learning At Speed: Operationalizing ML For Real-Time Data Streams

  • 1.
  • 2.
    ML is simple DataMagic Happiness 2
  • 3.
  • 4.
    The reality Set businessgoals Understand your data Create hypothesis Define experiments Prepare
 data Measure/
 evaluate results Score
 models Export
 models Verify/test models Train/tune models 4 Our Topic
  • 5.
    What is themodel? A model is a function transforming inputs to outputs - y = f(x), for example: Linear regression: y = ac + a1*x1 + … + an*xn Neural network: f (x) = K ( ∑i wi gi (x))  Such a definition of the model allows for an easy implementation of model’s composition. From the implementation point of view it is just function composition 5
  • 6.
    Model learning pipeline UCBerkeley AMPLab introduced machine learning pipelines as a graph defining the 
 complete chain of data transformation. Input Data Stream Data Preprocessing Predictive
 Model Data
 Postprocessing Results model
 outputs model
 inputs model learning pipeline 6
  • 7.
    QuestionnaireTraditional approach tomodel serving • Model is code • This code has to be saved and then somehow imported into model serving
 Why is this problematic? 7
  • 8.
     Impedance mismatch Continually expanding
 Data Scientist toolbox Defined Software
 Engineer toolbox 8
  • 9.
    Alternative - Modelas data Export Export Data Model
 Evaluator Results Model
 Document Portable Format for Analytics (PFA) 9 Standards
  • 10.
    Exporting Model asData with PMML There are already a lot of export options https://github.com/jpmml/jpmml-sparkml https://github.com/jpmml/jpmml-sklearn https://github.com/jpmml/jpmml-r https://github.com/jpmml/jpmml-tensorflow 10
  • 11.
    Evaluating PMML Model Thereare also a couple PMML evaluators https://github.com/jpmml/jpmml-evaluator https://github.com/opendatagroup/augustus 11
  • 12.
    Exporting Model asData with Tensorflow • Tensorflow execution is based on Tensors and Graphs • Tensors are defined as multilinear functions which consists of various vector variables • A computational graph is a series of Tensorflow operations arranged into graph of nodes. • Tensorflow support exporting of such graph in the form of binary protocol buffers. • There are two different export format - optimized graph and a new format - saved model 12
  • 13.
    Evaluating Tensorflow Model •Tensorflow is implemented in C++ with Python interface. • In order to simplify Tensorflow usage from Java, in 2017 Google introduced Tensorflow Java APIs. • Tensorflow Java APIs supports import of the exported model and use them for scoring. 13
  • 14.
    Additional Considerations –Model lifecycle • Models tend to change • Update frequencies vary greatly – 
 from hourly to quarterly/yearly • Model version tracking • Model release practices • Model update process 14
  • 15.
    Traditional model servingimplementation 15 Models are deployed separately from stream processing: • Tensorflow serving • Clipper • Model Server Apache MXNet • DeepDetect • TensorRT
  • 16.
    Log driven enterprise 16 •Complete decoupling of services interactions • All communications are going through the log rather then services talking to each other • Stream processors do not need to explicitly talk to the services, all updates are directly available in the log
  • 17.
    Model serving ina log driven enterprise A streaming system allowing to update models without interruption of execution (dynamically controlled stream). Machine
 learning Data
 source Model
 source Data stream Model stream Model update Streaming engine Current model Additional
 processing Result External model 
 storage (Optional) 17
  • 18.
    Model representation On thewire syntax = “proto3”; // Description of the trained model. message ModelDescriptor {    // Model name    string name = 1;    // Human readable description.    string description = 2;    // Data type for which this model is applied.    string dataType = 3;    // Model type    enum ModelType {        TENSORFLOW  = 0;        TENSORFLOWSAVED = 2;        PMML = 2;    };    ModelType modeltype = 4;    oneof MessageContent {        // Byte array containing the model        bytes data = 5;        string location = 6;    } } Internal
 trait Model {  def score(input : AnyVal) : AnyVal  def cleanup() : Unit  def toBytes() : Array[Byte]  def getType : Long }
 trait ModelFactoryl {  def create(input : ModelDescriptor) : Model  def restore(bytes : Array[Byte]) : Model } 18
  • 19.
    Additional considerations monitoring caseclass ModelToServeStats( name: String, // Model name description: String, // Model descriptor modelType: ModelDescriptor.ModelType, // Model type since : Long, // Start time of model usage var usage : Long = 0, // Number of servings var duration : Double = .0, // Time spent on serving        var min : Long = Long.MaxValue, // Min serving time var max : Long = Long.MinValue // Max serving time ) Model monitoring should provide information about usage, behavior, performance and lifecycle of the deployed models 19
  • 20.
    Queryable state Queryable state(interactive queries) is an approach, which  allows to get more from streaming than just the processing of data. This feature allows to treat the stream processing layer as a lightweight embedded database and, more concretely, to directly query the current state of a stream processing application, without needing to materialize that state to external databases or external storage first. Stream
 source State Stream processor Monitoring Interactive queries Streaming engine Other app 20
  • 21.
    Implementation options Modern stream-processingengines (SPE) take advantage of the cluster architectures. They organize computations into a set of operators, which enables execution parallelism; different operators can run on different threads or different machines. Stream-processing library (SPL), on the other hand, is a library, and often domain-specific language (DSL), of constructs simplifying building streaming applications. 21
  • 22.
    Decision criteria • Usingan SPE is a good fit for applications that require features provided out of the box by such engines. But you need to adhere to its programming and deployment models. • A SPLs provide a programming model that allows developers to build the applications or micro services the way that fits their precise needs and deploy them as simple standalone Java applications. 22
  • 23.
    • Akka Streams,part of the Akka project, is a library focused on in process back-pressured reactive streaming. • Provides a broad ecosystem of connectors to various technologies (data stores, message queues, file stores, streaming services, etc) - Alpakka • In Akka Streams computations are written in graph-resembling domain-specific lanauge (DSL), which aims to make translating graph drawings to and from code simpler. 23
  • 24.
    Using custom stage Createcustom stage, which is a fully type-safe way to encapsulate required functionality. ur stage will provide functionality somewhat similar to a Flink low-level join Source 1
 Alpakka Flow 1 Source 2
 Alpakka Flow 2 Custom stage – 
 model serving Stream 1 Stream 2 Stream 1 Stream 2 Results Stream 24
  • 25.
    Improve scalability Using therouter actor to forward request to an individual actor responsible for processing request for a specific model type low-level join Source 1
 Alpakka Flow 1 Source 2
 Alpakka Flow 2 Stream 1 Stream 2 Model serving router Stream 1 Stream 2 Model serving 
 actor Model serving 
 actorModel serving 
 actor Model serving 
 actor 25
  • 26.
    Flink is anopen source stream-processing engine (SPE) that provides the following: • Scales well, running on thousands of nodes. • Provides powerful checkpointing and save pointing facilities that enable fault tolerance and restart ability. • Provides state support for streaming applications, which allows minimization of usage of external databases for streaming applications. • Provides powerful window semantics, allowing you to produce accurate results, even in the case of out-of-order or late-arriving data 26
  • 27.
    Flink Low LevelJoin • Create a state object for one input (or both) • Update the state upon receiving elements from its input • Upon receiving elements from the other input, probe the state and produce the joined result Source 1 Source 2 Process stream 1 record Process stream 2 record Current state Result stream processing Stream 1 Stream 2 Join component Results stream 27
  • 28.
    Key based join Flink’sCoProcessFunction allows key-based merge of 2 streams. When using this API, data is key-partitioned across multiple Flink executors. Records from both streams are routed (based on key) to the appropriate executor that is responsible for the actual processing. Key group 1 Key group 2 Key group 3 • • • Key group n Model stream Model server 
 (key group 1) Model server
 (key group 2) Model server 
 (key group 3) Model server 
 (key group n) Servingresults • • • Key group 1 Key group 2 Key group 3 • • • Key group n Data stream 28
  • 29.
    Partition based join Flink’sRichCoFlatMapFunction allows merging of 2 streams in parallel (based on parralelization parameter). When using this API, on the partitioned stream, data from different partitions is processed by dedicated Flink executor. Model stream Model server 
 (instance 1) Model server 
 (instance 2) Model server 
 (instance 3) Model server 
 (instance n) Servingresults • • • Partition 1 Partition 2 Partition 3 Data stream 29
  • 30.
    Additional architectural concernsfor model serving 30 • Model tracking • Speculative model execution
  • 31.
    Model tracking -Motivation 31 • You update your model periodically • You score a particular record R with model version N • Later, you audit the data and wonder why R was scored the way it was • You can’t answer the question unless you know which model version was actually used for R
  • 32.
    Model tracking 32 • Needto remember models - a model repository • Basic info for the model: • Name • Version (or other unique ID) • Creation date • Quality metric • Definition • …
  • 33.
    Model tracking 33 • Youalso need to augment the output records with the model ID, as well as the score. • Input Record • Output Record with Score, model version ID … … … … … … … … IDscore
  • 34.
    Speculative execution 34 According toWikipedia speculative execution is: • an optimization technique • The system performs work that may not be needed, before it’s known if it will be needed • So, if and when we discover it IS needed, we don’t have to wait • Or, results are discarded if not needed. •
  • 35.
    General Architecture forspeculative execution 35 • Starter (proxy) controlling parallelism and invocation strategy. • Parallel execution by multiple executors • Collector responsible for bringing results from multiple executors together •
  • 36.
    Speculative execution 36 • Providesmore concurrency if extra resources are available. • Used for: • branch prediction in pipelined processors, • value prediction for exploiting value locality, • prefetching memory and files, • etc. • • Why not use it with machine learning??
  • 37.
    Applicability for modelserving 37 • Used to guarantee execution time • Several models: • A smart model, but takes time T1 • A “less smart”, but fast model with a fixed upper-limit on execution time, T2 << T1 • If timeout (T > T2) occurs before smart finishes, return the less accurate result • …
  • 38.
    Applicability for modelserving 38 • … • Consensus based model serving • If we have 3 or more models, score with all of them and return the majority result • …
  • 39.
    Applicability for modelserving 39 • … • Quality based model serving. • If we have a quality metric, pick the result with the best result. • Of course, you can combine these techniques.
  • 40.
  • 41.
  • 42.
    Flink Implementation 42 • Routerprocessor implements starter • Model processor • Speculative processor implements collector •
  • 43.
    Want to learnmore ? Serving Machine Learning Models A Guide to Architecture, Stream Processing Engines, 
 and Frameworks By Boris Lublinksy GET YOUR FREE COPY 43
  • 44.