2

I'm trying to read Kafka topics through Apache Spark Streaming and am not able to figure out how to transform the data in DStream to DataFrame and then store in a temp table. The messages in Kafka are in Avro format, which were created by Kafka JDBC Connect from a database. I have the below code, which works fine until it executes the spark.read.json to read the json to dataframe.

package consumerTest import io.confluent.kafka.serializers.KafkaAvroDeserializer import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010._ import scala.util.parsing.json.{JSON, JSONObject} object Consumer { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .master("local") .appName("my-spark-app") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate(); import spark.implicits._ val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "<kafka-server>:9092", "key.deserializer" -> classOf[KafkaAvroDeserializer], "value.deserializer" -> classOf[KafkaAvroDeserializer], "group.id" -> "sakwq", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> "false", "schema.registry.url" -> "http://<schema-registry>:8181" ) val topics = Array("cdcemployee") val stream = KafkaUtils.createDirectStream[String, Object]( ssc, PreferConsistent, Subscribe[String, Object](topics, kafkaParams) ) val data = stream.map(record => { println(record.value.toString()) record.value val df = spark.read.json(record.value.toString()) }) data.print(); ssc.start() ssc.awaitTermination() } } 

I am getting a Null pointer exception when executing the line val df = spark.read.json(record.value.toString())

18/05/10 09:49:11 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:135) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:133) at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689) at org.apache.spark.sql.SparkSession.read(SparkSession.scala:645) at consumerTest.Consumer$.$anonfun$main$1(Consumer.scala:63) at consumerTest.Consumer$.$anonfun$main$1$adapted(Consumer.scala:60) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/05/10 09:49:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:135) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:133) at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689) at org.apache.spark.sql.SparkSession.read(SparkSession.scala:645) at consumerTest.Consumer$.$anonfun$main$1(Consumer.scala:63) at consumerTest.Consumer$.$anonfun$main$1$adapted(Consumer.scala:60) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 

Also, here is the sample data that gets printed when executing the statement println(record.value.toString()) if I remove the spark.read.json statement

 {"CDCTRANSACTIONID": 182241, "CDCTIMESTAMP": "2018-03-26 18:04:44:776 - 04:00", "CDCCHANGESEQ": 14, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": null, "PostalCode": null, "DeptCode": "300", "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null} {"CDCTRANSACTIONID": 182241, "CDCTIMESTAMP": "2018-03-26 18:04:44:776 - 04:00", "CDCCHANGESEQ": 14, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "Raleigh", "State": null, "PostalCode": null, "DeptCode": "", "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null} {"CDCTRANSACTIONID": 197086, "CDCTIMESTAMP": "2018-03-27 11:18:48:022 - 04:00", "CDCCHANGESEQ": 15, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "New York", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null} {"CDCTRANSACTIONID": 197086, "CDCTIMESTAMP": "2018-03-27 11:18:48:022 - 04:00", "CDCCHANGESEQ": 15, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null} {"CDCTRANSACTIONID": 363712, "CDCTIMESTAMP": "2018-04-04 15:30:46:551 - 04:00", "CDCCHANGESEQ": 16, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Diego", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null} {"CDCTRANSACTIONID": 363712, "CDCTIMESTAMP": "2018-04-04 15:30:46:551 - 04:00", "CDCCHANGESEQ": 16, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "New York", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null} {"CDCTRANSACTIONID": 363785, "CDCTIMESTAMP": "2018-04-04 15:35:11:492 - 04:00", "CDCCHANGESEQ": 17, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 2, "EmpNum": 57, "LastName": "bobba2s", "FirstName": "Saikrishna Teja", "Address": "9220 Bothwell St", "Address2": "", "City": "San Diego", "State": "NC", "PostalCode": "27617", "DeptCode": "300", "Position": "", "HomePhone": "919 931-5737", "WorkPhone": "919 931-5737", "VacationDaysLeft": 10, "SickDaysLeft": 5, "StartDate": 16979, "Birthdate": 7270} {"CDCTRANSACTIONID": 364688, "CDCTIMESTAMP": "2018-04-04 16:39:05:602 - 04:00", "CDCCHANGESEQ": 18, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 1, "EmpNum": 59, "LastName": "Bobba", "FirstName": "Saikrishna Teja", "Address": "9220 Bothwell St", "Address2": "", "City": "Raleigh", "State": "NC", "PostalCode": "27617", "DeptCode": "300", "Position": "", "HomePhone": "919 931-5737", "WorkPhone": "919 931-5737", "VacationDaysLeft": 10, "SickDaysLeft": 5, "StartDate": 16979, "Birthdate": 7270} {"CDCTRANSACTIONID": 384368, "CDCTIMESTAMP": "2018-04-05 15:43:15:478 - 04:00", "CDCCHANGESEQ": 19, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": "CA", "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null} {"CDCTRANSACTIONID": 384368, "CDCTIMESTAMP": "2018-04-05 15:43:15:478 - 04:00", "CDCCHANGESEQ": 19, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "Raleigh", "State": "NC", "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null} {"CDCTRANSACTIONID": 650254, "CDCTIMESTAMP": "2018-04-18 16:19:35:669 - 04:00", "CDCCHANGESEQ": 20, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Diego", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null} 

Can anyone help me on how to convert this to dataframe and store it temporarily in a table?

edit:

enter image description here

2 Answers 2

6

stream contains the RDD for each interval of time, so for each interval time you can convert the rdd to datafarme as

stream.foreachRDD(rddRaw => { val rdd = rddRaw.map(_.value.toString) // or rddRaw.map(_._2) val df = spark.read.json(rdd) }) 

This should give you the dataframe as expected.

Hope this helps!

Sign up to request clarification or add additional context in comments.

10 Comments

Thanks, but that doesn't seem to work. I'm getting a compile time error message for the statement spark.read.json(rdd) that says Error:(69, 27) overloaded method value json with alternatives: (jsonDataset: org.apache.spark.sql.Dataset[String])org.apache.spark.sql.DataFrame <and> (jsonRDD: org.apache.spark.rdd.RDD[String])org.apache.spark.sql.DataFrame <and> (jsonRDD: org.apache.spark.api.java.JavaRDD[String])org.apache.spark.sql.DataFrame <and> (paths: String*)org.apache.spark.sql.DataFrame <and> (path: String)org.apache.spark.sql.DataFrame
continuing cannot be applied to (org.apache.spark.rdd.RDD[Object]) val df = spark.read.json(rdd)
I'm seeing the same error after removing val data =, unfortunately
can you share the screenshoot?
Added the screenshot to the question
|
0

Pyspark

Json Data:

{"timestamp": "1571053218000","t1": "55.23","t2": "10","t3": "ON"} {"timestamp": "1571053278000","t1": "63.23","t2": "11","t3": "OFF"} {"timestamp": "1571053338000","t1": "73.23","t2": "12","t3": "ON"} {"timestamp": "1571053398000","t1": "83.23","t2": "13","t3": "ON"} 

Pyspark Code to read from above json data:

from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.streaming import StreamingContext from pyspark.sql.types import IntegerType, LongType, DecimalType,StructType, StructField, StringType from pyspark.sql import Row from pyspark.sql.functions import col import pyspark.sql.functions as F from pyspark.sql import Window sc = SparkContext.getOrCreate() spark = SparkSession(sc) ssc = StreamingContext(sc, 5) stream_data = ssc.textFileStream("/filepath/") def readMyStream(rdd): if not rdd.isEmpty(): df = spark.read.json(rdd) print('Started the Process') print('Selection of Columns') df = df.select('t1','t2','t3','timestamp').where(col("timestamp").isNotNull()) df.show() stream_data.foreachRDD( lambda rdd: readMyStream(rdd) ) ssc.start() ssc.stop() 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.