2

In Addition to Not Serializable exception when integrating Spark SQL and Spark Streaming

My source code

public static void main(String args[]) { SparkConf sparkConf = new SparkConf().setAppName("NumberCount"); JavaSparkContext jc = new JavaSparkContext(sparkConf); JavaStreamingContext jssc = new JavaStreamingContext(jc, new Duration(2000)); jssc.addStreamingListener(new WorkCountMonitor()); int numThreads = Integer.parseInt(args[3]); Map<String,Integer> topicMap = new HashMap<String,Integer>(); String[] topics = args[2].split(","); for (String topic : topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String,String> data = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); data.print(); JavaDStream<Person> streamData = data.map(new Function<Tuple2<String, String>, Person>() { public Person call(Tuple2<String,String> v1) throws Exception { String[] stringArray = v1._2.split(","); Person Person = new Person(); Person.setName(stringArray[0]); Person.setAge(stringArray[1]); return Person; } }); final JavaSQLContext sqlContext = new JavaSQLContext(jc); streamData.foreachRDD(new Function<JavaRDD<Person>,Void>() { public Void call(JavaRDD<Person> rdd) { JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd, Person.class); subscriberSchema.registerAsTable("people"); System.out.println("all data"); JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people"); System.out.println("afterwards"); List<String> males = new ArrayList<String>(); males = names.map(new Function<Row,String>() { public String call(Row row) { return row.getString(0); } }).collect(); System.out.println("before for"); for (String name : males) { System.out.println(name); } return null; } }); jssc.start(); jssc.awaitTermination(); } 

JavaSQLContext is also declared outside the ForeachRDD loop but i am still getting NonSerializableException

14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job 1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.rdd.RDD.map(RDD.scala:271) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at com.basic.spark.NumberCount$2.call(NumberCount.java:79) at com.basic.spark.NumberCount$2.call(NumberCount.java:67) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:529) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:529) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.JavaSQLContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 20 more

I appreciate if you have any suggestion.

2 Answers 2

0

Have you implemented Serializable interface in Person pojo class.Also can you try declaring topicMap as final

Sign up to request clarification or add additional context in comments.

Comments

0

here is the working code

package com.basic.spark; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.JavaSchemaRDD; import org.apache.spark.sql.api.java.Row; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; public class NumberCount implements Serializable { transient SparkConf sparkConf = new SparkConf().setAppName("NumberCount"); transient JavaSparkContext jc = new JavaSparkContext(sparkConf); transient JavaStreamingContext jssc_1 = new JavaStreamingContext(jc, new Duration(1000)); transient JavaSQLContext sqlContext = new JavaSQLContext(jc); transient Producer producer = configureKafka(); public static void main(String args[]) { (new NumberCount()).job_1(args); } public void job_1(String...args) { jssc_1.addStreamingListener(new WorkCountMonitor()); int numThreads = Integer.parseInt(args[3]); Map<String,Integer> topicMap = new HashMap<String,Integer>(); String[] topics = args[2].split(","); for (String topic : topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String,String> data = KafkaUtils.createStream(jssc_1, args[0], args[1], topicMap); data.window(new Duration(10000), new Duration(2000)); JavaDStream<String> streamData = data.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String,String> v1) { return v1._2; } }); streamData.foreachRDD(new Function<JavaRDD<String>,Void>() { public Void call(JavaRDD<String> rdd) { if (rdd.count() < 1) return null; try { JavaSchemaRDD eventSchema = sqlContext.jsonRDD(rdd); eventSchema.registerTempTable("event"); System.out.println("all data"); JavaSchemaRDD names = sqlContext.sql("SELECT deviceId, count(*) FROM event group by deviceId"); System.out.println("afterwards"); // List<Long> males = new ArrayList<Long>(); // // males = names.map(new Function<Row,Long>() { // public Long call(Row row) { // return row.getLong(0); // } // }).collect(); // System.out.println("before for"); // ArrayList<KeyedMessage<String, String>> data = new ArrayList<KeyedMessage<String, String>>(); // for (Long name : males) { // System.out.println("**************"+name); // writeToKafka_1(data, String.valueOf(name)); // } // producer.send(data); List<String> deviceDetails = new ArrayList<String>(); deviceDetails = names.map(new Function<Row,String>() { public String call(Row row) { return row.getString(0) +":" + row.getLong(1); } }).collect(); System.out.println("before for"); ArrayList<KeyedMessage<String, String>> data = new ArrayList<KeyedMessage<String, String>>(); for (String name : deviceDetails) { System.out.println("**************"+name); writeToKafka_1(data, name); } producer.send(data); } catch (Exception e) { System.out.println("#ERROR_1# #" + rdd); e.printStackTrace(); } return null; } }); jssc_1.start(); jssc_1.awaitTermination(); } public Producer<String, String> configureKafka() { Properties props = new Properties(); props.put("metadata.broker.list", "xx.xx.xx.xx:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("compression.codec", "2"); props.put("request.required.acks", "0"); props.put("producer.type", "sync"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); return producer; } public void writeToKafka_1(ArrayList<KeyedMessage<String,String>> list, String msg) { list.add(new KeyedMessage<String,String>("my-replicated-topic-1", "", msg)); } } 

4 Comments

You helped the OP but not the community. Answer should have included atleast one line explaining what was wrong and how you made it work.
what is OP? @Aditya you means operations/production support
Oh. OP is the person who made the first post in any online thread. In StackOverflow, it is the person who asks the question. SX community lives on questions and answers. When you fix someone's code, you're not helping anyone but the OP. Look around and you will see that whenever someone writes even tiniest piece of code as an answer they explain it - so that even those who don't use this particular language or don't see what's going on in the code can understand the solution. In the end, we don't want to offer free coding or debugging service for OPs.
Edit your answer, add a little description (though it might be hard as you wrote the answer several months ago. Add a comment and let's start upvoting you :)

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.