Project Hydrogen: Unifying State-of-the-art Big Data and AI in Apache Spark Xiangrui Meng 2018/07
Spark: unified analytics engine for big data “Apache Spark is the Taylor Swift of big data software.” - Fortune 500,000 meetup members 4,000 summit attendees
AI: re-shaping the world “Artificial Intelligence is the New Electricity.”  -  Andrew Ng Transportation Healthcare and Genomics Internet of Things Fraud Prevention Personalization
Get started with Spark + AI? Try Databricks Community Edition: http://databricks.com/try
Two communities: big data and AI Significant progress has been made by both big data and AI communities in recent years to push the cutting edge: more complex big data scenarios more complex deep learning scenarios
The cross? 6 Map/Reduce CaffeOnSpark TensorFlowOnSpark DataFrame-based APIs 50+ Data Sources Python/Java/R interfaces Structured Streaming ML Pipelines API Continuous Processing RDD Project Tungsten Pandas UDF TensorFrames scikit-learn pandas/numpy/scipy LIBLINEAR R glmnet xgboost GraphLab Caffe/PyTorch/MXNet TensorFlow Keras Distributed TensorFlow Horovod tf.data tf.transform AI/ML ?? TF XLA
Neither big data nor AI is easy “Hidden Technical Debt in Machine Learning Systems,” Google NIPS 2015 Figure 1: Only a small fraction of real-world ML systems is composed of the ML code, as shown by the small green box in the middle. The required surrounding infrastructure is vast and complex. ML Code Configuration Data Collection Data Verification Feature Extraction Machine Resource Management Analysis Tools Process Management Tools Serving Infrastructure Monitoring
Big data for AI A common goal of collecting, processing, and managing big data is to extract value from it. AI/ML is the most mentioned approach in our conversations with customers. improved customer engagement and conversions by improving image classification (case study) increased customer satisfaction, retention, and lifetime value by detecting abusive language in real-time (case study)
Big data for AI There are many efforts from the Spark community to integrate Spark with AI/ML frameworks: ● (Yahoo) CaffeOnSpark, TensorFlowOnSpark ● (Intel) BigDL ● (John Snow Labs) Spark-NLP ● (Databricks) spark-sklearn, tensorframes, spark-deep-learning ● … 80+ ML/AI packages on spark-packages.org
AI needs big data One cannot apply AI techniques without data. And DL models scale with amount of data. We have seen efforts from the AI community to handle different data scenarios: ● tf.data, tf.Transform ● spark-tensorflow-connector ● ... source: Andrew Ng
When AI goes distributed ... When datasets get bigger and bigger, we see more and more distributed training scenarios and open-source offerings, e.g., distributed TensorFlow, Horovod, and distributed MXNet. This is where we see Spark and AI efforts overlap more.
The status quo: two simple stories As a data scientist, I can: ● build a pipeline that fetches training events from a production data warehouse and trains a DL model in parallel; ● apply a trained DL model to a distributed stream of events and enrich it with predicted labels.
Distributed training data warehouse load fit model Required: Be able to read from Databricks Delta, Parquet, MySQL, Hive, etc. Answer: Apache Spark Required: distributed GPU cluster for fast training Answer: Horovod, Distributed Tensorflow, etc Databricks Delta
Two separate data and AI clusters? load using a Spark cluster fit on a GPU cluster model save data required: glue code
Streaming model inference Kafka load predict model required: ● save to stream sink ● GPU for fast inference
A hybrid Spark and AI cluster load using a Spark cluster w/ GPUs fit a model distributedly on the same cluster model load using a Spark cluster w/ GPUs predict w/ GPUs as a Spark task model
Unfortunately, it doesn’t work out of the box. (Demo that a distributed DL job needs to run as a Spark job to avoid crashing into other Spark workloads on the same cluster.)
18 Different execution models Task 1 Task 2 Task 3 Spark Tasks are independent of each other Embarrassingly parallel & massively scalable Distributed Training Complete coordination among tasks Optimized for communication
19 Different execution models Task 1 Task 2 Task 3 Spark Tasks are independent of each other Embarrassingly parallel & massively scalable If one crashes… Distributed Training Complete coordination among tasks Optimized for communication
20 Different execution models Task 1 Task 2 Task 3 Spark Tasks are independent of each other Embarrassingly parallel & massively scalable If one crashes, rerun that one Distributed Training Complete coordination among tasks Optimized for communication If one crashes, must rerun all tasks
21 Project Hydrogen: Spark + AI Barrier Execution Mode Optimized Data Exchange Accelerator Aware Scheduling
22 Barrier execution mode We introduce gang scheduling to Spark on top of MapReduce execution model. So a distributed DL job can run as a Spark job. ● It starts all tasks together. ● It provides sufficient info and tooling to run a hybrid distributed job. ● It cancels and restarts all tasks in case of failures. Umbrella JIRA: SPARK-24374 (ETA: Spark 2.4, 3.0)
23 RDD.barrier() RDD.barrier() tells Spark to launch the tasks together. rdd.barrier().mapPartitions { iter => val context = BarrierTaskContext.get() ... }
24 context.barrier() context.barrier() places a global barrier and waits until all tasks in this stage hit this barrier. val context = BarrierTaskContext.get() … // write partition data out context.barrier()
25 context.getTaskInfos() context.getTaskInfos() returns info about all tasks in this stage. if (context.partitionId == 0) { val hosts = context.getTaskInfos().map(_.host) ... // start a hybrid training job, e.g., via MPI } context.barrier() // wait until training finishes
26 Cluster manager support YARN SPARK-24723 Kubernetes SPARK-24724 Mesos SPARK-24725 In Spark standalone mode, users need passwordless SSH among workers to run a hybrid MPI job. The community is working on the support of other cluster managers.
27 Demo (Demo barrier mode prototype, where a distributed DL job runs as a Spark job and does not crash into other Spark workloads.)
28 Optimized data exchange (SPIP) None of the integrations are possible without exchanging data between Spark and AI frameworks. And performance matters. We proposed using a standard interface for data exchange to simplify the integrations without introducing much overhead. SPIP JIRA: SPARK-24579 (pending vote, ETA 3.0)
29 Sources × destinations Parquet Avro SequenceFile MySQL ORC ... TFRecords CSV LIBSVM HDF5 NetCDF ... ×
30 Sources + 2 + destinations Parquet Avro SequenceFile MySQL ORC ... TFRecords CSV LIBSVM HDF5 NetCDF ... + Apache Spark + Apache Arrow +
31 From source to destination Parquet file Spark tf.Dataset
32 From source to destination: a long path Parquet file Spark Tungsten format in JVM Arrow buffer in JVM native Parquet reader Arrow buffer in Python data in GPU Pandastf.Dataset
33 Shortcuts Parquet file Spark Tungsten format in JVM Arrow buffer in JVM native Parquet reader Arrow buffer in Python data in GPU Pandastf.Dataset
34 Vectorized computation Though Spark uses a columnar storage format, the public user access interface is row-based. df.toArrowRDD.map { batches => ... // vectorized computation }
35 Generalize Pandas UDF to Scala/Java Pandas UDF was introduced in Spark 2.3, which uses Arrow for data exchange and utilizes Pandas for vectorized computation.
36 Data pipelining CPU GPU t1 fetch batch #1 t2 fetch batch #2 process batch #1 t3 fetch batch #3 process batch #2 t4 process batch #3 CPU GPU t1 fetch batch #1 t2 process batch #1 t3 fetch batch #2 t4 process batch #2 t5 fetch batch #3 t6 process batch #3 (pipelining)
37 Accelerator-aware scheduling (SPIP) To utilize accelerators (GPUs, FPGAs) in a heterogeneous cluster or to utilize multiple accelerators in a multi-task node, Spark needs to understand the accelerators installed on each node. SPIP JIRA: SPARK-24615 (pending vote, ETA: 3.0)
38 Request accelerators With accelerator awareness, users can specify accelerators constraints or hints (API pending discussion): rdd.accelerated .by(“/gpu/p100”) .numPerTask(2) .required // or .optional
39 Multiple tasks on the same node When multiples tasks are scheduled on the same node with multiple GPUs, each task knows which GPUs are assigned to avoid crashing into each other (API pending discussion): // inside a task closure val gpus = context.getAcceleratorInfos()
40 Cluster manager support In Spark standalone mode, we can list accelerators in worker conf and aggregate the information for scheduling. We also need to provide support for other cluster managers: YARN Kubernetes Mesos
41 Timeline Spark 2.4 ● barrier execution mode (basic scenarios) ● (Databricks) horovod integration w/ barrier mode Spark 3.0 ● barrier execution mode ● optimized data exchange ● accelerator-aware scheduling
42 Acknowledgement ● Many ideas in Project Hydrogen are based on previous community work: TensorFrames, BigDL, Apache Arrow, Pandas UDF, Spark GPU support, MPI, etc. ● We would like to thank many Spark committers and contributors who helped the project proposal, design, and implementation.
Thank you!

Project Hydrogen: State-of-the-Art Deep Learning on Apache Spark

  • 1.
    Project Hydrogen: UnifyingState-of-the-art Big Data and AI in Apache Spark Xiangrui Meng 2018/07
  • 2.
    Spark: unified analyticsengine for big data “Apache Spark is the Taylor Swift of big data software.” - Fortune 500,000 meetup members 4,000 summit attendees
  • 3.
    AI: re-shaping theworld “Artificial Intelligence is the New Electricity.”  -  Andrew Ng Transportation Healthcare and Genomics Internet of Things Fraud Prevention Personalization
  • 4.
    Get started withSpark + AI? Try Databricks Community Edition: http://databricks.com/try
  • 5.
    Two communities: bigdata and AI Significant progress has been made by both big data and AI communities in recent years to push the cutting edge: more complex big data scenarios more complex deep learning scenarios
  • 6.
    The cross? 6 Map/Reduce CaffeOnSpark TensorFlowOnSpark DataFrame-based APIs 50+Data Sources Python/Java/R interfaces Structured Streaming ML Pipelines API Continuous Processing RDD Project Tungsten Pandas UDF TensorFrames scikit-learn pandas/numpy/scipy LIBLINEAR R glmnet xgboost GraphLab Caffe/PyTorch/MXNet TensorFlow Keras Distributed TensorFlow Horovod tf.data tf.transform AI/ML ?? TF XLA
  • 7.
    Neither big datanor AI is easy “Hidden Technical Debt in Machine Learning Systems,” Google NIPS 2015 Figure 1: Only a small fraction of real-world ML systems is composed of the ML code, as shown by the small green box in the middle. The required surrounding infrastructure is vast and complex. ML Code Configuration Data Collection Data Verification Feature Extraction Machine Resource Management Analysis Tools Process Management Tools Serving Infrastructure Monitoring
  • 8.
    Big data forAI A common goal of collecting, processing, and managing big data is to extract value from it. AI/ML is the most mentioned approach in our conversations with customers. improved customer engagement and conversions by improving image classification (case study) increased customer satisfaction, retention, and lifetime value by detecting abusive language in real-time (case study)
  • 9.
    Big data forAI There are many efforts from the Spark community to integrate Spark with AI/ML frameworks: ● (Yahoo) CaffeOnSpark, TensorFlowOnSpark ● (Intel) BigDL ● (John Snow Labs) Spark-NLP ● (Databricks) spark-sklearn, tensorframes, spark-deep-learning ● … 80+ ML/AI packages on spark-packages.org
  • 10.
    AI needs bigdata One cannot apply AI techniques without data. And DL models scale with amount of data. We have seen efforts from the AI community to handle different data scenarios: ● tf.data, tf.Transform ● spark-tensorflow-connector ● ... source: Andrew Ng
  • 11.
    When AI goesdistributed ... When datasets get bigger and bigger, we see more and more distributed training scenarios and open-source offerings, e.g., distributed TensorFlow, Horovod, and distributed MXNet. This is where we see Spark and AI efforts overlap more.
  • 12.
    The status quo:two simple stories As a data scientist, I can: ● build a pipeline that fetches training events from a production data warehouse and trains a DL model in parallel; ● apply a trained DL model to a distributed stream of events and enrich it with predicted labels.
  • 13.
    Distributed training data warehouseload fit model Required: Be able to read from Databricks Delta, Parquet, MySQL, Hive, etc. Answer: Apache Spark Required: distributed GPU cluster for fast training Answer: Horovod, Distributed Tensorflow, etc Databricks Delta
  • 14.
    Two separate dataand AI clusters? load using a Spark cluster fit on a GPU cluster model save data required: glue code
  • 15.
    Streaming model inference Kafkaload predict model required: ● save to stream sink ● GPU for fast inference
  • 16.
    A hybrid Sparkand AI cluster load using a Spark cluster w/ GPUs fit a model distributedly on the same cluster model load using a Spark cluster w/ GPUs predict w/ GPUs as a Spark task model
  • 17.
    Unfortunately, it doesn’twork out of the box. (Demo that a distributed DL job needs to run as a Spark job to avoid crashing into other Spark workloads on the same cluster.)
  • 18.
    18 Different execution models Task1 Task 2 Task 3 Spark Tasks are independent of each other Embarrassingly parallel & massively scalable Distributed Training Complete coordination among tasks Optimized for communication
  • 19.
    19 Different execution models Task1 Task 2 Task 3 Spark Tasks are independent of each other Embarrassingly parallel & massively scalable If one crashes… Distributed Training Complete coordination among tasks Optimized for communication
  • 20.
    20 Different execution models Task1 Task 2 Task 3 Spark Tasks are independent of each other Embarrassingly parallel & massively scalable If one crashes, rerun that one Distributed Training Complete coordination among tasks Optimized for communication If one crashes, must rerun all tasks
  • 21.
    21 Project Hydrogen: Spark+ AI Barrier Execution Mode Optimized Data Exchange Accelerator Aware Scheduling
  • 22.
    22 Barrier execution mode Weintroduce gang scheduling to Spark on top of MapReduce execution model. So a distributed DL job can run as a Spark job. ● It starts all tasks together. ● It provides sufficient info and tooling to run a hybrid distributed job. ● It cancels and restarts all tasks in case of failures. Umbrella JIRA: SPARK-24374 (ETA: Spark 2.4, 3.0)
  • 23.
    23 RDD.barrier() RDD.barrier() tells Sparkto launch the tasks together. rdd.barrier().mapPartitions { iter => val context = BarrierTaskContext.get() ... }
  • 24.
    24 context.barrier() context.barrier() places aglobal barrier and waits until all tasks in this stage hit this barrier. val context = BarrierTaskContext.get() … // write partition data out context.barrier()
  • 25.
    25 context.getTaskInfos() context.getTaskInfos() returns infoabout all tasks in this stage. if (context.partitionId == 0) { val hosts = context.getTaskInfos().map(_.host) ... // start a hybrid training job, e.g., via MPI } context.barrier() // wait until training finishes
  • 26.
    26 Cluster manager support YARN SPARK-24723 Kubernetes SPARK-24724 Mesos SPARK-24725 InSpark standalone mode, users need passwordless SSH among workers to run a hybrid MPI job. The community is working on the support of other cluster managers.
  • 27.
    27 Demo (Demo barrier modeprototype, where a distributed DL job runs as a Spark job and does not crash into other Spark workloads.)
  • 28.
    28 Optimized data exchange(SPIP) None of the integrations are possible without exchanging data between Spark and AI frameworks. And performance matters. We proposed using a standard interface for data exchange to simplify the integrations without introducing much overhead. SPIP JIRA: SPARK-24579 (pending vote, ETA 3.0)
  • 29.
  • 30.
    30 Sources + 2+ destinations Parquet Avro SequenceFile MySQL ORC ... TFRecords CSV LIBSVM HDF5 NetCDF ... + Apache Spark + Apache Arrow +
  • 31.
    31 From source todestination Parquet file Spark tf.Dataset
  • 32.
    32 From source todestination: a long path Parquet file Spark Tungsten format in JVM Arrow buffer in JVM native Parquet reader Arrow buffer in Python data in GPU Pandastf.Dataset
  • 33.
  • 34.
    34 Vectorized computation Though Sparkuses a columnar storage format, the public user access interface is row-based. df.toArrowRDD.map { batches => ... // vectorized computation }
  • 35.
    35 Generalize Pandas UDFto Scala/Java Pandas UDF was introduced in Spark 2.3, which uses Arrow for data exchange and utilizes Pandas for vectorized computation.
  • 36.
    36 Data pipelining CPU GPU t1fetch batch #1 t2 fetch batch #2 process batch #1 t3 fetch batch #3 process batch #2 t4 process batch #3 CPU GPU t1 fetch batch #1 t2 process batch #1 t3 fetch batch #2 t4 process batch #2 t5 fetch batch #3 t6 process batch #3 (pipelining)
  • 37.
    37 Accelerator-aware scheduling (SPIP) Toutilize accelerators (GPUs, FPGAs) in a heterogeneous cluster or to utilize multiple accelerators in a multi-task node, Spark needs to understand the accelerators installed on each node. SPIP JIRA: SPARK-24615 (pending vote, ETA: 3.0)
  • 38.
    38 Request accelerators With acceleratorawareness, users can specify accelerators constraints or hints (API pending discussion): rdd.accelerated .by(“/gpu/p100”) .numPerTask(2) .required // or .optional
  • 39.
    39 Multiple tasks onthe same node When multiples tasks are scheduled on the same node with multiple GPUs, each task knows which GPUs are assigned to avoid crashing into each other (API pending discussion): // inside a task closure val gpus = context.getAcceleratorInfos()
  • 40.
    40 Cluster manager support InSpark standalone mode, we can list accelerators in worker conf and aggregate the information for scheduling. We also need to provide support for other cluster managers: YARN Kubernetes Mesos
  • 41.
    41 Timeline Spark 2.4 ● barrierexecution mode (basic scenarios) ● (Databricks) horovod integration w/ barrier mode Spark 3.0 ● barrier execution mode ● optimized data exchange ● accelerator-aware scheduling
  • 42.
    42 Acknowledgement ● Many ideasin Project Hydrogen are based on previous community work: TensorFrames, BigDL, Apache Arrow, Pandas UDF, Spark GPU support, MPI, etc. ● We would like to thank many Spark committers and contributors who helped the project proposal, design, and implementation.
  • 43.