Analyzing Time Series Data with Apache Spark and Cassandra 1 Patrick McFadin
 Chief Evangelist for Apache Cassandra, DataStax @PatrickMcFadin
Apache Cassandra
Cassandra is… • Shared nothing • Masterless peer-to-peer • Great scaling story • Resilient to failure
Cassandra for Applications APACHE CASSANDRA
Example: Weather Station • Weather station collects data • Cassandra stores in sequence • Application reads in sequence
Queries supported CREATE TABLE raw_weather_data (
 wsid text,
 year int,
 month int,
 day int,
 hour int,
 temperature double,
 dewpoint double,
 pressure double,
 wind_direction int,
 wind_speed double,
 sky_condition int,
 sky_condition_text text,
 one_hour_precip double,
 six_hour_precip double,
 PRIMARY KEY ((wsid), year, month, day, hour)
 ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC); Get weather data given •Weather Station ID •Weather Station ID and Time •Weather Station ID and Range of Time
Aggregation Queries CREATE TABLE daily_aggregate_temperature (
 wsid text,
 year int,
 month int,
 day int,
 high double,
 low double,
 mean double,
 variance double,
 stdev double,
 PRIMARY KEY ((wsid), year, month, day)
 ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC); Get temperature stats given •Weather Station ID •Weather Station ID and Time •Weather Station ID and Range of Time Windsor California July 1, 2014 High: 73.4 Low : 51.4
Apache Spark
Map Reduce Input Data Map Reduce Intermediate Data Output Data
Apache Spark • 10x faster on disk,100x faster in memory than Hadoop MR • Works out of the box on EMR • Fault Tolerant Distributed Datasets • Batch, iterative and streaming analysis • In Memory Storage and Disk • Integrates with Most File and Storage Options Up to 100× faster (2-10× on disk) 2-5× less code
Spark Components Spark Core Spark SQL structured Spark Streaming real-time MLlib machine learning GraphX graph
The DAG Step 1 Step 2 Step 3 Step 4 Step 5
org.apache.spark.rdd.RDD Resilient Distributed Dataset (RDD) •Created through transformations on data (map,filter..) or other RDDs •Immutable •Partitioned •Reusable
RDD Operations •Transformations - Similar to scala collections API •Produce new RDDs •filter, flatmap, map, distinct, groupBy, union, zip, reduceByKey, subtract •Actions •Require materialization of the records to generate a value •collect: Array[T], count, fold, reduce..
Analytic Analytic Search Transformation Action RDD Operations
Cassandra and Spark
Great combo Store a ton of data Analyze a ton of data
Great combo Spark Streaming Near Real-time SparkSQL Structured Data MLLib Machine Learning GraphX Graph Analysis
Great combo Spark Streaming Near Real-time SparkSQL Structured Data MLLib Machine Learning GraphX Graph Analysis CREATE TABLE raw_weather_data ( wsid text, year int, month int, day int, hour int, temperature double, dewpoint double, pressure double, wind_direction int, wind_speed double, sky_condition int, sky_condition_text text, one_hour_precip double, six_hour_precip double, PRIMARY KEY ((wsid), year, month, day, hour) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC); Spark Connector
Executer Master Worker Executer Executer Server
Master Worker Worker Worker Worker 0-24Token Ranges 0-100 25-49 50-74 75-99 I will only analyze 25% of the data.
Master Worker Worker Worker Worker 0-24 25-49 50-74 75-9975-99 0-24 25-49 50-74 AnalyticsTransactional
Executer Master Worker Executer Executer 75-99 SELECT * FROM keyspace.table WHERE token(pk) > 75 AND token(pk) <= 99 Spark RDD Spark Partition Spark Partition Spark Partition Spark Connector
Executer Master Worker Executer Executer 75-99 Spark RDD Spark Partition Spark Partition Spark Partition
Spark Reads on Cassandra Awesome animation by DataStax’s own Russell Spitzer
Spark RDDs Represent a Large Amount of Data Partitioned into Chunks RDD 1 2 3 4 5 6 7 8 9Node 2 Node 1 Node 3 Node 4
Node 2 Node 1 Spark RDDs Represent a Large Amount of Data Partitioned into Chunks RDD 2 346 7 8 9 Node 3 Node 4 1 5
Node 2 Node 1 RDD 2 346 7 8 9 Node 3 Node 4 1 5 Spark RDDs Represent a Large Amount of Data Partitioned into Chunks
Cassandra Data is Distributed By Token Range
Cassandra Data is Distributed By Token Range 0 500
Cassandra Data is Distributed By Token Range 0 500 999
Cassandra Data is Distributed By Token Range 0 500 Node 1 Node 2 Node 3 Node 4
Cassandra Data is Distributed By Token Range 0 500 Node 1 Node 2 Node 3 Node 4 Without vnodes
Cassandra Data is Distributed By Token Range 0 500 Node 1 Node 2 Node 3 Node 4 With vnodes
Node 1 120-220 300-500 780-830 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions
Node 1 120-220 300-500 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 1 780-830
1 Node 1 120-220 300-500 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830
2 1 Node 1 300-500 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830
2 1 Node 1 300-500 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830
2 1 Node 1 300-400 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 400-500
21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 400-500
21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 400-500 3
21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 3 400-500
21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 3
4 21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 3
4 21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 3
421 Node 1 spark.cassandra.input.split.size 50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 3
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50780-830 Node 1
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
4 spark.cassandra.input.page.row.size 50 Data is Retrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
Spark Connector Cassandra Cassandra + Spark Joins and Unions No Yes Transformations Limited Yes Outside Data Integration No Yes Aggregations Limited Yes
Type mapping CQL Type Scala Type ascii String bigint Long boolean Boolean counter Long decimal BigDecimal, java.math.BigDecimal double Double float Float inet java.net.InetAddress int Int list Vector, List, Iterable, Seq, IndexedSeq, java.util.List map Map, TreeMap, java.util.HashMap set Set, TreeSet, java.util.HashSet text, varchar String timestamp Long, java.util.Date, java.sql.Date, org.joda.time.DateTime timeuuid java.util.UUID uuid java.util.UUID varint BigInt, java.math.BigInteger *nullable values Option
Attaching to Spark and Cassandra // Import Cassandra-specific functions on SparkContext and RDD objects import org.apache.spark.{SparkContext, SparkConf}
 import com.datastax.spark.connector._ /** The setMaster("local") lets us run & test the job right in our IDE */
 val conf = new SparkConf(true) .set("spark.cassandra.connection.host", "127.0.0.1") .setMaster(“local[*]") .setAppName(getClass.getName) // Optionally
 .set("cassandra.username", "cassandra")
 .set("cassandra.password", “cassandra") 
 val sc = new SparkContext(conf)
Weather station example CREATE TABLE raw_weather_data (
 wsid text, 
 year int, 
 month int, 
 day int, 
 hour int, 
 temperature double, 
 dewpoint double, 
 pressure double, 
 wind_direction int, 
 wind_speed double, 
 sky_condition int, 
 sky_condition_text text, 
 one_hour_precip double, 
 six_hour_precip double, 
 PRIMARY KEY ((wsid), year, month, day, hour)
 ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC);
Simple example /** keyspace & table */
 val tableRDD = sc.cassandraTable("isd_weather_data", "raw_weather_data")
 
 
 /** get a simple count of all the rows in the raw_weather_data table */
 val rowCount = tableRDD.count()
 
 
 println(s"Total Rows in Raw Weather Table: $rowCount")
 sc.stop()
Simple example /** keyspace & table */
 val tableRDD = sc.cassandraTable("isd_weather_data", "raw_weather_data")
 
 
 /** get a simple count of all the rows in the raw_weather_data table */
 val rowCount = tableRDD.count()
 
 
 println(s"Total Rows in Raw Weather Table: $rowCount")
 sc.stop() Executer SELECT * FROM isd_weather_data.raw_weather_data Spark RDD Spark Partition Spark Connector
Using CQL SELECT temperature
 FROM raw_weather_data
 WHERE wsid = '724940:23234'
 AND year = 2008
 AND month = 12
 AND day = 1; val cqlRRD = sc.cassandraTable("isd_weather_data", "raw_weather_data")
 .select("temperature")
 .where("wsid = ? AND year = ? AND month = ? AND DAY = ?",
 "724940:23234", "2008", "12", “1")
Using SQL SELECT wsid, year, month, day, max(temperature) high
 FROM raw_weather_data
 GROUP BY wsid, year, month, day; Wait wut? SELECT w.name, w.lat, w.long
 FROM raw_weather_data r,
 JOIN weather_station w
 ON w.id = r.wsid
 GROUP BY r.wsid;
Python! from pyspark_cassandra import CassandraSparkContext, Row
 from pyspark import SparkContext, SparkConf
 from pyspark.sql import SQLContext # needed for toDF()
 
 conf = SparkConf() 
 .setAppName("Weather App") 
 .setMaster("spark://127.0.0.1:7077") 
 .set("spark.cassandra.connection.host", "127.0.0.1")
 
 sc = CassandraSparkContext(conf=conf)
 sql = SQLContext(sc)
 // Count unique weather stations
 temps = sc.cassandraTable("isd_weather_data", "raw_weather_data").toDF()
 food_count = temps.select("temperature").groupBy("wsid").count()
Weather Station Analysis • Weather station collects data • Cassandra stores in sequence • Spark rolls up data into new tables Windsor California July 1, 2014 High: 73.4 Low : 51.4
Roll-up table(SparkSQL example) • Weather Station Id and Date are unique • High and low temp for each day spark-sql> INSERT INTO TABLE > daily_aggregate_temperature > SELECT wsid, year, month, day, max(temperature) high, min(temperature) low > FROM raw_weather_data > GROUP BY wsid, year, month, day; OK Time taken: 2.345 seconds aggregations CREATE TABLE daily_aggregate_temperature (
 wsid text,
 year int,
 month int,
 day int,
 high double,
 low double,
 mean double,
 variance double,
 stdev double,
 PRIMARY KEY ((wsid), year, month, day)
 ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC);
What just happened • Data is read from temperature table • Transformed • Inserted into the daily_high_low table Table: temperature Table: daily_high_low Read data from table Transform Insert data into table
Spark Streaming
zillions of bytes gigabytes per second Spark Versus Spark Streaming
Analytic Analytic Search Spark Streaming Kinesis,'S3'
DStream - Micro Batches μBatch (ordinary RDD) μBatch (ordinary RDD) μBatch (ordinary RDD) Processing of DStream = Processing of μBatches, RDDs DStream • Continuous sequence of micro batches • More complex processing models are possible with less effort • Streaming computations as a series of deterministic batch computations on small time intervals
Now what? Cassandra Only DC Cassandra + Spark DC Spark Jobs Spark Streaming
You can do this at home! https://github.com/killrweather/killrweather
Thank you! Bring the questions Follow me on twitter @PatrickMcFadin

Analyzing Time Series Data with Apache Spark and Cassandra

  • 1.
    Analyzing Time SeriesData with Apache Spark and Cassandra 1 Patrick McFadin
 Chief Evangelist for Apache Cassandra, DataStax @PatrickMcFadin
  • 2.
  • 3.
    Cassandra is… • Sharednothing • Masterless peer-to-peer • Great scaling story • Resilient to failure
  • 4.
  • 5.
    Example: Weather Station •Weather station collects data • Cassandra stores in sequence • Application reads in sequence
  • 6.
    Queries supported CREATE TABLEraw_weather_data (
 wsid text,
 year int,
 month int,
 day int,
 hour int,
 temperature double,
 dewpoint double,
 pressure double,
 wind_direction int,
 wind_speed double,
 sky_condition int,
 sky_condition_text text,
 one_hour_precip double,
 six_hour_precip double,
 PRIMARY KEY ((wsid), year, month, day, hour)
 ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC); Get weather data given •Weather Station ID •Weather Station ID and Time •Weather Station ID and Range of Time
  • 7.
    Aggregation Queries CREATE TABLEdaily_aggregate_temperature (
 wsid text,
 year int,
 month int,
 day int,
 high double,
 low double,
 mean double,
 variance double,
 stdev double,
 PRIMARY KEY ((wsid), year, month, day)
 ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC); Get temperature stats given •Weather Station ID •Weather Station ID and Time •Weather Station ID and Range of Time Windsor California July 1, 2014 High: 73.4 Low : 51.4
  • 8.
  • 9.
  • 10.
    Apache Spark • 10xfaster on disk,100x faster in memory than Hadoop MR • Works out of the box on EMR • Fault Tolerant Distributed Datasets • Batch, iterative and streaming analysis • In Memory Storage and Disk • Integrates with Most File and Storage Options Up to 100× faster (2-10× on disk) 2-5× less code
  • 11.
    Spark Components Spark Core SparkSQL structured Spark Streaming real-time MLlib machine learning GraphX graph
  • 13.
    The DAG Step 1 Step2 Step 3 Step 4 Step 5
  • 14.
    org.apache.spark.rdd.RDD Resilient Distributed Dataset(RDD) •Created through transformations on data (map,filter..) or other RDDs •Immutable •Partitioned •Reusable
  • 15.
    RDD Operations •Transformations -Similar to scala collections API •Produce new RDDs •filter, flatmap, map, distinct, groupBy, union, zip, reduceByKey, subtract •Actions •Require materialization of the records to generate a value •collect: Array[T], count, fold, reduce..
  • 16.
  • 17.
  • 18.
    Great combo Store aton of data Analyze a ton of data
  • 19.
    Great combo Spark Streaming NearReal-time SparkSQL Structured Data MLLib Machine Learning GraphX Graph Analysis
  • 20.
    Great combo Spark Streaming NearReal-time SparkSQL Structured Data MLLib Machine Learning GraphX Graph Analysis CREATE TABLE raw_weather_data ( wsid text, year int, month int, day int, hour int, temperature double, dewpoint double, pressure double, wind_direction int, wind_speed double, sky_condition int, sky_condition_text text, one_hour_precip double, six_hour_precip double, PRIMARY KEY ((wsid), year, month, day, hour) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC); Spark Connector
  • 21.
  • 22.
  • 23.
  • 24.
    Executer Master Worker Executer Executer 75-99 SELECT * FROM keyspace.table WHEREtoken(pk) > 75 AND token(pk) <= 99 Spark RDD Spark Partition Spark Partition Spark Partition Spark Connector
  • 25.
  • 26.
    Spark Reads onCassandra Awesome animation by DataStax’s own Russell Spitzer
  • 27.
    Spark RDDs Represent aLarge Amount of Data Partitioned into Chunks RDD 1 2 3 4 5 6 7 8 9Node 2 Node 1 Node 3 Node 4
  • 28.
    Node 2 Node 1 SparkRDDs Represent a Large Amount of Data Partitioned into Chunks RDD 2 346 7 8 9 Node 3 Node 4 1 5
  • 29.
    Node 2 Node 1 RDD 2 346 78 9 Node 3 Node 4 1 5 Spark RDDs Represent a Large Amount of Data Partitioned into Chunks
  • 30.
    Cassandra Data isDistributed By Token Range
  • 31.
    Cassandra Data isDistributed By Token Range 0 500
  • 32.
    Cassandra Data isDistributed By Token Range 0 500 999
  • 33.
    Cassandra Data isDistributed By Token Range 0 500 Node 1 Node 2 Node 3 Node 4
  • 34.
    Cassandra Data isDistributed By Token Range 0 500 Node 1 Node 2 Node 3 Node 4 Without vnodes
  • 35.
    Cassandra Data isDistributed By Token Range 0 500 Node 1 Node 2 Node 3 Node 4 With vnodes
  • 36.
    Node 1 120-220 300-500 780-830 0-50 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions
  • 37.
    Node 1 120-220 300-500 0-50 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 1 780-830
  • 38.
    1 Node 1 120-220 300-500 0-50 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830
  • 39.
    2 1 Node 1 300-500 0-50 spark.cassandra.input.split.size50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830
  • 40.
    2 1 Node 1 300-500 0-50 spark.cassandra.input.split.size50 Reported  density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830
  • 41.
    2 1 Node 1 300-400 0-50 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 400-500
  • 42.
    21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 400-500
  • 43.
    21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 400-500 3
  • 44.
    21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 3 400-500
  • 45.
    21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 3
  • 46.
    4 21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 3
  • 47.
    4 21 Node 1 0-50 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 780-830 3
  • 48.
    421 Node 1 spark.cassandra.input.split.size 50 Reported density  is  0.5 The Connector Uses Information on the Node to Make 
 Spark Partitions 3
  • 49.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50780-830 Node 1
  • 50.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50
  • 51.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50
  • 52.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows
  • 53.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows
  • 54.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows 50 CQL Rows
  • 55.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows
  • 56.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows
  • 57.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows
  • 58.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows
  • 59.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows
  • 60.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
  • 61.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 780 and token(pk) <= 830 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
  • 62.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
  • 63.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
  • 64.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
  • 65.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
  • 66.
    4 spark.cassandra.input.page.row.size 50 Data isRetrieved Using the DataStax Java Driver 0-50 780-830 Node 1 SELECT * FROM keyspace.table WHERE token(pk) > 0 and token(pk) <= 50 50 CQL Rows50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows 50 CQL Rows
  • 67.
    Spark Connector Cassandra Cassandra + Spark Joinsand Unions No Yes Transformations Limited Yes Outside Data Integration No Yes Aggregations Limited Yes
  • 68.
    Type mapping CQL TypeScala Type ascii String bigint Long boolean Boolean counter Long decimal BigDecimal, java.math.BigDecimal double Double float Float inet java.net.InetAddress int Int list Vector, List, Iterable, Seq, IndexedSeq, java.util.List map Map, TreeMap, java.util.HashMap set Set, TreeSet, java.util.HashSet text, varchar String timestamp Long, java.util.Date, java.sql.Date, org.joda.time.DateTime timeuuid java.util.UUID uuid java.util.UUID varint BigInt, java.math.BigInteger *nullable values Option
  • 69.
    Attaching to Sparkand Cassandra // Import Cassandra-specific functions on SparkContext and RDD objects import org.apache.spark.{SparkContext, SparkConf}
 import com.datastax.spark.connector._ /** The setMaster("local") lets us run & test the job right in our IDE */
 val conf = new SparkConf(true) .set("spark.cassandra.connection.host", "127.0.0.1") .setMaster(“local[*]") .setAppName(getClass.getName) // Optionally
 .set("cassandra.username", "cassandra")
 .set("cassandra.password", “cassandra") 
 val sc = new SparkContext(conf)
  • 70.
    Weather station example CREATETABLE raw_weather_data (
 wsid text, 
 year int, 
 month int, 
 day int, 
 hour int, 
 temperature double, 
 dewpoint double, 
 pressure double, 
 wind_direction int, 
 wind_speed double, 
 sky_condition int, 
 sky_condition_text text, 
 one_hour_precip double, 
 six_hour_precip double, 
 PRIMARY KEY ((wsid), year, month, day, hour)
 ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC);
  • 71.
    Simple example /** keyspace& table */
 val tableRDD = sc.cassandraTable("isd_weather_data", "raw_weather_data")
 
 
 /** get a simple count of all the rows in the raw_weather_data table */
 val rowCount = tableRDD.count()
 
 
 println(s"Total Rows in Raw Weather Table: $rowCount")
 sc.stop()
  • 72.
    Simple example /** keyspace& table */
 val tableRDD = sc.cassandraTable("isd_weather_data", "raw_weather_data")
 
 
 /** get a simple count of all the rows in the raw_weather_data table */
 val rowCount = tableRDD.count()
 
 
 println(s"Total Rows in Raw Weather Table: $rowCount")
 sc.stop() Executer SELECT * FROM isd_weather_data.raw_weather_data Spark RDD Spark Partition Spark Connector
  • 73.
    Using CQL SELECT temperature
 FROMraw_weather_data
 WHERE wsid = '724940:23234'
 AND year = 2008
 AND month = 12
 AND day = 1; val cqlRRD = sc.cassandraTable("isd_weather_data", "raw_weather_data")
 .select("temperature")
 .where("wsid = ? AND year = ? AND month = ? AND DAY = ?",
 "724940:23234", "2008", "12", “1")
  • 74.
    Using SQL SELECT wsid,year, month, day, max(temperature) high
 FROM raw_weather_data
 GROUP BY wsid, year, month, day; Wait wut? SELECT w.name, w.lat, w.long
 FROM raw_weather_data r,
 JOIN weather_station w
 ON w.id = r.wsid
 GROUP BY r.wsid;
  • 75.
    Python! from pyspark_cassandra importCassandraSparkContext, Row
 from pyspark import SparkContext, SparkConf
 from pyspark.sql import SQLContext # needed for toDF()
 
 conf = SparkConf() 
 .setAppName("Weather App") 
 .setMaster("spark://127.0.0.1:7077") 
 .set("spark.cassandra.connection.host", "127.0.0.1")
 
 sc = CassandraSparkContext(conf=conf)
 sql = SQLContext(sc)
 // Count unique weather stations
 temps = sc.cassandraTable("isd_weather_data", "raw_weather_data").toDF()
 food_count = temps.select("temperature").groupBy("wsid").count()
  • 76.
    Weather Station Analysis •Weather station collects data • Cassandra stores in sequence • Spark rolls up data into new tables Windsor California July 1, 2014 High: 73.4 Low : 51.4
  • 77.
    Roll-up table(SparkSQL example) •Weather Station Id and Date are unique • High and low temp for each day spark-sql> INSERT INTO TABLE > daily_aggregate_temperature > SELECT wsid, year, month, day, max(temperature) high, min(temperature) low > FROM raw_weather_data > GROUP BY wsid, year, month, day; OK Time taken: 2.345 seconds aggregations CREATE TABLE daily_aggregate_temperature (
 wsid text,
 year int,
 month int,
 day int,
 high double,
 low double,
 mean double,
 variance double,
 stdev double,
 PRIMARY KEY ((wsid), year, month, day)
 ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC);
  • 78.
    What just happened •Data is read from temperature table • Transformed • Inserted into the daily_high_low table Table: temperature Table: daily_high_low Read data from table Transform Insert data into table
  • 79.
  • 80.
    zillions of bytesgigabytes per second Spark Versus Spark Streaming
  • 81.
  • 82.
    DStream - MicroBatches μBatch (ordinary RDD) μBatch (ordinary RDD) μBatch (ordinary RDD) Processing of DStream = Processing of μBatches, RDDs DStream • Continuous sequence of micro batches • More complex processing models are possible with less effort • Streaming computations as a series of deterministic batch computations on small time intervals
  • 83.
    Now what? Cassandra Only DC Cassandra +Spark DC Spark Jobs Spark Streaming
  • 84.
    You can dothis at home! https://github.com/killrweather/killrweather
  • 86.
    Thank you! Bring thequestions Follow me on twitter @PatrickMcFadin