Alpine Academy - Spark Lightning fast cluster computing with Python and just a wee bit of Scala
Who am I? Holden I prefer she/her for pronouns Co-author of the Learning Spark book Software Engineer at IBM’s Spark Technology Center @holdenkarau http://www.slideshare.net/hkarau https://www.linkedin.com/in/holdenkarau
What we are going to explore together! What is Spark? Spark’s primary distributed collection Word count Coffee break! How PySpark works Using libraries with Spark Spark SQL / DataFrames (time permitting)
What is Spark? General purpose distributed system With a really nice API Apache project (one of the most active) Must faster than Hadoop Map/Reduce
The different pieces of Spark Apache Spark SQL & DataFrames Streaming Language APIs Scala, Java, Python, & R Graph Tools Spark ML bagel & Grah X MLLib Community Packages
Skipping set-up time :)
Some pages to keep open for the exercises http://bit.ly/sparkDocs http://bit.ly/sparkPyDocs OR http://bit.ly/sparkScalaDoc http://bit.ly/PySparkIntroExamples http://bit.ly/learningSparkExamples OR http://spark.apache.org/docs/latest/api/python/index.html http://spark.apache.org/docs/latest/ https://github.com/holdenk/intro-to-pyspark-demos
Starting the shell ./bin/pyspark OR ./bin/spark-shell [Lots of output] SparkContext available as sc, SQLContext available as sqlContext. >>>
Reducing log level cp ./conf/log4j.properties.template ./conf/log4j.properties Then set log4j.rootCategory=ERROR, console
Sparkcontext: entry to the world Can be used to create RDDs from many input sources Native collections, local & remote FS Any Hadoop Data Source Also create counters & accumulators Automatically created in the shells (called sc) Specify master & app name when creating Master can be local[*], spark:// , yarn, etc. app name should be human readable and make sense etc.
RDDs: Spark’s Primary abstraction RDD (Resilient Distributed Dataset) Recomputed on node failure Distributed across the cluster Lazily evaluated (transformations & actions)
Word count lines = sc.textFile(src) words = lines.flatMap(lambda x: x.split(" ")) word_count = (words.map(lambda x: (x, 1)) .reduceByKey(lambda x, y: x+y)) word_count.saveAsTextFile(output)
Word count lines = sc.textFile(src) words = lines.flatMap(lambda x: x.split(" ")) word_count = (words.map(lambda x: (x, 1)) .reduceByKey(lambda x, y: x+y)) word_count.saveAsTextFile(output) No data is read or processed until after this line This is an “action” which forces spark to evaluate the RDD
Word count - in Scala val lines = sc.textFile(src) val words = lines.flatMap(_.split(" ")) word_count = words.map(_, 1)).reduceByKey( _ + _) word_count.saveAsTextFile(output)
Some common transformations & actions Transformations (lazy) map filter flatMap reduceByKey join cogroup Actions (eager) count reduce collect take saveAsTextFile saveAsHadoop countByValue Photo by Steve Photo by Dan G
Exercise time Photo by recastle
Lets find the lines with the word “Spark” Get started in Python: import os src = "file:///"+os.environ['SPARK_HOME']+"/README.md" lines = sc.textFile(src) Get started in Scala: val src = "file:///" + sys.env("SPARK_HOME") + "/README.md" val lines = sc.textFile(src)
What did you find?
A solution: lines = sc.textFile(src) spark_lines = lines.filter( lambda x: x.lower().find("spark") != -1) print spark_lines.count()
Combined with previous example Do you notice anything funky? We read the data in twice :( cache/persist/checkpoint to the rescue!
lets use toDebugString un-cached: >>> print word_count.toDebugString() (2) PythonRDD[17] at RDD at PythonRDD.scala:43 [] | MapPartitionsRDD[14] at mapPartitions at PythonRDD.scala:346 [] | ShuffledRDD[13] at partitionBy at NativeMethodAccessorImpl.java:-2 [] +-(2) PairwiseRDD[12] at reduceByKey at <stdin>:3 [] | PythonRDD[11] at reduceByKey at <stdin>:3 [] | MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:-2 [] | file:////home/holden/repos/spark/README.md HadoopRDD[9] at textFile at NativeMethodAccessorImpl.java:-2 []
lets use toDebugString cached: >>> print word_count.toDebugString() (2) PythonRDD[8] at RDD at PythonRDD.scala:43 [] | MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:346 [] | ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:-2 [] +-(2) PairwiseRDD[3] at reduceByKey at <stdin>:3 [] | PythonRDD[2] at reduceByKey at <stdin>:3 [] | MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 [] | CachedPartitions: 2; MemorySize: 2.7 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | file:////home/holden/repos/spark/README.md HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []
A detour into the internals Photo by Bill Ward
Why lazy evaluation? Allows pipelining procedures Less passes over our data, extra happiness Can skip materializing intermediate results which are really really big* Figuring out where our code fails becomes a little trickier
So what happens when we run this code? Driver Worker Worker Worker HDFS / Cassandra/ etc
So what happens when we run this code? Driver Worker Worker Worker HDFS / Cassandra/ etc function
So what happens when we run this code? Driver Worker Worker Worker HDFS / Cassandra/ etc read read read
So what happens when we run this code? Driver Worker Worker Worker HDFS / Cassandra/ etc cached cached cached counts
Spark in Scala, how does PySpark work? Py4J + pickling + magic This can be kind of slow sometimes RDDs are generally RDDs of pickled objects Spark SQL (and DataFrames) avoid some of this
So what does that look like? Driver py4j Worker 1 Worker K pipe pipe
Using other libraries built ins just import!* Except for Hive, compile with -PHive & then import spark-packages --packages generic python pre-install on workers (pssh, puppet, etc.) add it with --zip-files sc.addPyFile
So lets take “DataFrames” out for a spin useful for structured data support schema inference on JSON Many operations done without* pickling Integrated into ML! Accessed through SQLContext Not the same feature set as Panda’s or R DataFrames
Loading data df = sqlContext.read.load( "files/testweet.json", # From learning-spark- examples format="json") # Built in json, parquet, etc. # More formats (csv, etc.) at http://spark-packages.org/
DataFrames aren’t quite as lazy... Keep track of schema information Loading JSON data involves looking at the data Before if we tried to load non-existent data wouldn’t fail right away, now fails right away
Examining Schema Information root |-- contributorsIDs: array (nullable = true) | |-- element: string (containsNull = true) |-- createdAt: string (nullable = true) |-- currentUserRetweetId: long (nullable = true) |-- hashtagEntities: array (nullable = true) | |-- element: string (containsNull = true) |-- id: long (nullable = true) |-- inReplyToStatusId: long (nullable = true) |-- inReplyToUserId: long (nullable = true) |-- isFavorited: boolean (nullable = true) |-- isPossiblySensitive: boolean (nullable = true) |-- isTruncated: boolean (nullable = true) |-- mediaEntities: array (nullable = true) | |-- element: string (containsNull = true) |-- retweetCount: long (nullable = true) |-- source: string (nullable = true) |-- text: string (nullable = true)
Manipulating DataFrames SQL df.registerTempTable("panda") sqlContext.sql("select * from panda where id = 529799371026485248") API df.filter(df.id == 529799371026485248)
DataFrames to RDD’s & vice versa map lets us work per-row df.map(lambda row: row.text) Converting back infer_schema specify the schema
Or we can make a UDF def function(x): # Some magic sqlContext.registerFunction(“name”, function, IntegerType()) Or in Scala: def func(a: String): Int = //Magic sqlContext.udf.register("name", func)
More exercise funtimes :) Lets load a sample tweet Write a UDF to compute the length of the tweet Select the length of the tweet
Additional Resources Programming guide (along with JavaDoc, PyDoc, ScalaDoc, etc.) http://spark.apache.org/docs/latest/ Books Videos Our next meetup! Spark Office Hours follow me on twitter for future ones - https://twitter.com/holdenkarau fill out this survey to choose the next date - http://bit.ly/spOffice1
Learning Spark Fast Data Processing with Spark (Out of Date) Fast Data Processing with Spark Advanced Analytics with Spark Coming soon: Spark in Action
Spark Videos Apache Spark Youtube Channel My youtube Spark videos - http://bit.ly/1MsvUKo Spark Summit 2014 training Paco’s Introduction to Apache Spark

Alpine academy apache spark series #1 introduction to cluster computing with python & a wee bit of scala

  • 1.
    Alpine Academy -Spark Lightning fast cluster computing with Python and just a wee bit of Scala
  • 2.
    Who am I? Holden Iprefer she/her for pronouns Co-author of the Learning Spark book Software Engineer at IBM’s Spark Technology Center @holdenkarau http://www.slideshare.net/hkarau https://www.linkedin.com/in/holdenkarau
  • 3.
    What we aregoing to explore together! What is Spark? Spark’s primary distributed collection Word count Coffee break! How PySpark works Using libraries with Spark Spark SQL / DataFrames (time permitting)
  • 4.
    What is Spark? Generalpurpose distributed system With a really nice API Apache project (one of the most active) Must faster than Hadoop Map/Reduce
  • 5.
    The different piecesof Spark Apache Spark SQL & DataFrames Streaming Language APIs Scala, Java, Python, & R Graph Tools Spark ML bagel & Grah X MLLib Community Packages
  • 6.
  • 7.
    Some pages tokeep open for the exercises http://bit.ly/sparkDocs http://bit.ly/sparkPyDocs OR http://bit.ly/sparkScalaDoc http://bit.ly/PySparkIntroExamples http://bit.ly/learningSparkExamples OR http://spark.apache.org/docs/latest/api/python/index.html http://spark.apache.org/docs/latest/ https://github.com/holdenk/intro-to-pyspark-demos
  • 8.
    Starting the shell ./bin/pysparkOR ./bin/spark-shell [Lots of output] SparkContext available as sc, SQLContext available as sqlContext. >>>
  • 9.
    Reducing log level cp./conf/log4j.properties.template ./conf/log4j.properties Then set log4j.rootCategory=ERROR, console
  • 10.
    Sparkcontext: entry tothe world Can be used to create RDDs from many input sources Native collections, local & remote FS Any Hadoop Data Source Also create counters & accumulators Automatically created in the shells (called sc) Specify master & app name when creating Master can be local[*], spark:// , yarn, etc. app name should be human readable and make sense etc.
  • 11.
    RDDs: Spark’s Primaryabstraction RDD (Resilient Distributed Dataset) Recomputed on node failure Distributed across the cluster Lazily evaluated (transformations & actions)
  • 12.
    Word count lines =sc.textFile(src) words = lines.flatMap(lambda x: x.split(" ")) word_count = (words.map(lambda x: (x, 1)) .reduceByKey(lambda x, y: x+y)) word_count.saveAsTextFile(output)
  • 13.
    Word count lines =sc.textFile(src) words = lines.flatMap(lambda x: x.split(" ")) word_count = (words.map(lambda x: (x, 1)) .reduceByKey(lambda x, y: x+y)) word_count.saveAsTextFile(output) No data is read or processed until after this line This is an “action” which forces spark to evaluate the RDD
  • 14.
    Word count -in Scala val lines = sc.textFile(src) val words = lines.flatMap(_.split(" ")) word_count = words.map(_, 1)).reduceByKey( _ + _) word_count.saveAsTextFile(output)
  • 15.
    Some common transformations& actions Transformations (lazy) map filter flatMap reduceByKey join cogroup Actions (eager) count reduce collect take saveAsTextFile saveAsHadoop countByValue Photo by Steve Photo by Dan G
  • 16.
  • 17.
    Lets find thelines with the word “Spark” Get started in Python: import os src = "file:///"+os.environ['SPARK_HOME']+"/README.md" lines = sc.textFile(src) Get started in Scala: val src = "file:///" + sys.env("SPARK_HOME") + "/README.md" val lines = sc.textFile(src)
  • 18.
  • 19.
    A solution: lines =sc.textFile(src) spark_lines = lines.filter( lambda x: x.lower().find("spark") != -1) print spark_lines.count()
  • 20.
    Combined with previousexample Do you notice anything funky? We read the data in twice :( cache/persist/checkpoint to the rescue!
  • 21.
    lets use toDebugString un-cached: >>>print word_count.toDebugString() (2) PythonRDD[17] at RDD at PythonRDD.scala:43 [] | MapPartitionsRDD[14] at mapPartitions at PythonRDD.scala:346 [] | ShuffledRDD[13] at partitionBy at NativeMethodAccessorImpl.java:-2 [] +-(2) PairwiseRDD[12] at reduceByKey at <stdin>:3 [] | PythonRDD[11] at reduceByKey at <stdin>:3 [] | MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:-2 [] | file:////home/holden/repos/spark/README.md HadoopRDD[9] at textFile at NativeMethodAccessorImpl.java:-2 []
  • 22.
    lets use toDebugString cached: >>>print word_count.toDebugString() (2) PythonRDD[8] at RDD at PythonRDD.scala:43 [] | MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:346 [] | ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:-2 [] +-(2) PairwiseRDD[3] at reduceByKey at <stdin>:3 [] | PythonRDD[2] at reduceByKey at <stdin>:3 [] | MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 [] | CachedPartitions: 2; MemorySize: 2.7 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | file:////home/holden/repos/spark/README.md HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []
  • 23.
    A detour intothe internals Photo by Bill Ward
  • 24.
    Why lazy evaluation? Allowspipelining procedures Less passes over our data, extra happiness Can skip materializing intermediate results which are really really big* Figuring out where our code fails becomes a little trickier
  • 25.
    So what happenswhen we run this code? Driver Worker Worker Worker HDFS / Cassandra/ etc
  • 26.
    So what happenswhen we run this code? Driver Worker Worker Worker HDFS / Cassandra/ etc function
  • 27.
    So what happenswhen we run this code? Driver Worker Worker Worker HDFS / Cassandra/ etc read read read
  • 28.
    So what happenswhen we run this code? Driver Worker Worker Worker HDFS / Cassandra/ etc cached cached cached counts
  • 29.
    Spark in Scala,how does PySpark work? Py4J + pickling + magic This can be kind of slow sometimes RDDs are generally RDDs of pickled objects Spark SQL (and DataFrames) avoid some of this
  • 30.
    So what doesthat look like? Driver py4j Worker 1 Worker K pipe pipe
  • 31.
    Using other libraries builtins just import!* Except for Hive, compile with -PHive & then import spark-packages --packages generic python pre-install on workers (pssh, puppet, etc.) add it with --zip-files sc.addPyFile
  • 32.
    So lets take“DataFrames” out for a spin useful for structured data support schema inference on JSON Many operations done without* pickling Integrated into ML! Accessed through SQLContext Not the same feature set as Panda’s or R DataFrames
  • 33.
    Loading data df =sqlContext.read.load( "files/testweet.json", # From learning-spark- examples format="json") # Built in json, parquet, etc. # More formats (csv, etc.) at http://spark-packages.org/
  • 34.
    DataFrames aren’t quiteas lazy... Keep track of schema information Loading JSON data involves looking at the data Before if we tried to load non-existent data wouldn’t fail right away, now fails right away
  • 35.
    Examining Schema Information root |--contributorsIDs: array (nullable = true) | |-- element: string (containsNull = true) |-- createdAt: string (nullable = true) |-- currentUserRetweetId: long (nullable = true) |-- hashtagEntities: array (nullable = true) | |-- element: string (containsNull = true) |-- id: long (nullable = true) |-- inReplyToStatusId: long (nullable = true) |-- inReplyToUserId: long (nullable = true) |-- isFavorited: boolean (nullable = true) |-- isPossiblySensitive: boolean (nullable = true) |-- isTruncated: boolean (nullable = true) |-- mediaEntities: array (nullable = true) | |-- element: string (containsNull = true) |-- retweetCount: long (nullable = true) |-- source: string (nullable = true) |-- text: string (nullable = true)
  • 36.
    Manipulating DataFrames SQL df.registerTempTable("panda") sqlContext.sql("select *from panda where id = 529799371026485248") API df.filter(df.id == 529799371026485248)
  • 37.
    DataFrames to RDD’s& vice versa map lets us work per-row df.map(lambda row: row.text) Converting back infer_schema specify the schema
  • 38.
    Or we canmake a UDF def function(x): # Some magic sqlContext.registerFunction(“name”, function, IntegerType()) Or in Scala: def func(a: String): Int = //Magic sqlContext.udf.register("name", func)
  • 39.
    More exercise funtimes:) Lets load a sample tweet Write a UDF to compute the length of the tweet Select the length of the tweet
  • 40.
    Additional Resources Programming guide(along with JavaDoc, PyDoc, ScalaDoc, etc.) http://spark.apache.org/docs/latest/ Books Videos Our next meetup! Spark Office Hours follow me on twitter for future ones - https://twitter.com/holdenkarau fill out this survey to choose the next date - http://bit.ly/spOffice1
  • 41.
    Learning Spark Fast Data Processingwith Spark (Out of Date) Fast Data Processing with Spark Advanced Analytics with Spark Coming soon: Spark in Action
  • 42.
    Spark Videos Apache SparkYoutube Channel My youtube Spark videos - http://bit.ly/1MsvUKo Spark Summit 2014 training Paco’s Introduction to Apache Spark

Editor's Notes

  • #7 https://www.flickr.com/photos/nevernotfocused/14710283621/in/photolist-opU3PF-7Wjaig-7WnqV5-dzDfCv-9kpT5T-7EZN75-49t4dW-6cuYDv-dGbcz9-96Ec1M-2GgYZ5-9GJcmP-SCQc9-5dDnBa-9QozHB-7B8eqJ-3b58nt-4x9EG1-8c59U5-3HmbHE-8MSWuX-8XPUSh-eb2Rbx-N8FNU-qWfLm3-7Wzmsp-634wj8-8NpFnx-dGboCE-aE78Bz-hHHpJd-9ABFAu-oSuKdc-d3YZmJ-ePsq4E-7rXnPp-eyEw6e-8PnfCu-cdG9Sb-srVSE-5UYwzW-8Lfgpr-64xVvw-4NHLes-64pCsr-64pDSi-7WCzAE-dMMjrK-TE6Nv-dtGqLF
  • #13 We can examine how RDD’s work in practice with the traditonal word count example. If you’ve taken another intro to big data class, or just worked with mapreduce you’ll notice that this is a lot less code than we normally have to do.
  • #15 We can examine how RDD’s work in practice with the traditonal word count example. If you’ve taken another intro to big data class, or just worked with mapreduce you’ll notice that this is a lot less code than we normally have to do.
  • #16 Panda https://www.flickr.com/photos/dannydansoy/14796219847/in/photolist-oxuuEK-djGVsL-Kmi1i-pCUSTG-pAUejE-4h3psP-9wnBzL-pmrVmA-nUPi4J-qudhKM-b6u5p2-4h7snY-oCFDwT-bnjuJu-8WJYBp-4i5rpo-2pthZD-6Wu6v4-9oheF6-sSXVqV-oVbEDV-eEWMcU-rW9sfP-cdHrWU-sdh3CZ-rW9u74-4zfj1L-6WyaeN-jq9H83-uBFLAY-djGJHE-7dhK6i-63xb5p-ismea-qudjDg-4kuBWy-7bR7bZ-srti4t-dtVySZ-aqMyvB-aT8y1n-eEWKkm-4eFZ8m-7szpy-rm3uJZ-iDGvfm-6Wy4i3-apHzX2-9117E-pAUhf9 Beaver: https://www.flickr.com/photos/sherseydc/2452702213/in/photolist-4JJJVc-dJ18wN-6YKwzR-uQSFpe-9jtjwr-k5yLMP-uQpxHo-i5Z62d-cDDf9w-evkSg-oA75Df-sCodZ3-jY12zC-aJ4WG-p9fnWX-a3WZMo-a1c6W2-efymRX-rywhN-a55i3T-mJSB5T-qSa1rU-5Hbwjz-axeSeC-n5s6QM-cDDeNs-uQpAJ3-mH1fkx-dHUF3X-5wVXSn-cgqjXw-br2MdK-bqZaE8-qaiwrY-faxrfo-7LRKFS-k5ADU7-6cUj1e-cgqkNN-4Cc1n6-8H2ihf-4oxEob-4oxDLQ-8Kp1KK-uNybAm-9ZZSSG-qr5KyY-qrhZuZ-rnX1j2-54uh5d
  • #17 https://www.flickr.com/photos/18521020@N04/16907107492/in/photolist-rL2m2j-rtFs9c-rL3wXp-qPmbtK-rtxMko-rHR5BY-rty5Pb-rL3Egn-rtFLbV-qP8Qo9-rL2nXd-rL8NLc-rtzgBy-rty1qb-rL3wUt-rL3G8P-qP8XUS-rtz6cA-rtxX5o-rL2eXq-rHRbWw-rtFJgn-rL8NnX-rL3vHk-rL2ex7-rrPb4F-qP8Vn7-rL3BHB-rtz3xN-qP8YY5-rrPgoZ-rL2cCq-qPmc7t-6Cs4Z9-4PpUzz-rL3KCz-rL3HE6-rHQYhy-rtFzPT-rrP6q2-rtxSAy-rrPj3g-rHQVD7-rtzhD3-rL3zMH-rtzb1U-rtxT6w-rL2vH3-rL8G9V-rrPd6g backup https://www.flickr.com/photos/18521020@N04/16908361265/in/photolist-rL8LJ6-rHR4QN-rrPnzg-rL2nHA-qPkVBF-rL2m2j-rtFs9c-rL3wXp-qPmbtK-rtxMko-rHR5BY-rty5Pb-rL3Egn-rtFLbV-qP8Qo9-rL2nXd-rL8NLc-rtzgBy-rty1qb-rL3wUt-rL3G8P-qP8XUS-rtz6cA-rtxX5o-rL2eXq-rHRbWw-rtFJgn-rL8NnX-rL3vHk-rL2ex7-rrPb4F-qP8Vn7-rL3BHB-rtz3xN-qP8YY5-rrPgoZ-rL2cCq-qPmc7t-6Cs4Z9-4PpUzz-rL3KCz-rL3HE6-rHQYhy-rtFzPT-rrP6q2-rtxSAy-rrPj3g-rHQVD7-rtzhD3-rL3zMH
  • #19 Should be ~28 (unless running a different version of Spark)
  • #24 https://www.flickr.com/photos/billward/508211284/in/photolist-87LCUa-87PQ6A-87LC44-87PPWs-87LD54-87PPDo-87LBY2-87LCqB-87LBD6-87LCWH-87PQML-87LCRT-7GYBRK-6ZhCV4-bEjtfp-qVRG3a-7gcxPZ-3zxGY6-9Un3j4-f3mrBZ-thSTC9-e214LM-dEDTg3-7TqRQU-7TqRNN-e26FZb-6sjCuP-86656v-7H3xJd-dovrrt-7H3ycb-91otqR-4uiXe5-4ueUy2-7H3y4J-LUHvw-LUS7x-7GYCor-7GYCa8-7H3x7A-7GYCjk-7H3xCh-7GYCMV-dUuL8X-dUAnK7-dUuLut-dUAnU5-dUAnAA-dUAofC-dUAneN