2

This is my source code where in Im getting some data from the server side, which keeps on generating a stream of data. And then for each RDD , I'm applying the SQL Schema, and once this table is created Im trying to select something from this DStream.

 List<String> males = new ArrayList<String>(); JavaDStream<String> data = streamingContext.socketTextStream("localhost", (port)); data.print(); System.out.println("Socket connection established to read data from Subscriber Server"); JavaDStream<SubscriberData> streamData = data .map(new Function<String, SubscriberData>() { public SubscriberData call(String record) { String[] stringArray = record.split(","); SubscriberData subscriberData = new SubscriberData(); subscriberData.setMsisdn(stringArray[0]); subscriberData.setSubscriptionType(stringArray[1]); subscriberData.setName(stringArray[2]); subscriberData.setGender(stringArray[3]); subscriberData.setProfession(stringArray[4]); subscriberData.setMaritalStatus(stringArray[5]); return subscriberData; } }); streamData.foreachRDD(new Function<JavaRDD<SubscriberData>,Void>(){ public Void call(JavaRDD<SubscriberData> rdd){ JavaSQLContext sqlContext = new JavaSQLContext(sc); JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,SubscriberData.class); subscriberSchema.registerAsTable("SUBSCRIBER_DIMENSION"); System.out.println("all data"); JavaSchemaRDD names = sqlContext.sql("SELECT msisdn FROM SUBSCRIBER_DIMENSION WHERE GENDER='Male'"); System.out.println("afterwards"); List<String> males = new ArrayList<String>(); males = names.map(new Function<Row, String>() { public String call(Row row) { return row.getString(0); } }).collect(); System.out.println("before for"); for (String name : males) { System.out.println(name); } return null; } }); streamingContext.start(); 

But it throws this Serializable Exception althought the classes Im using do implement Serialization.

 14/11/06 12:55:20 ERROR scheduler.JobScheduler: Error running job streaming job 1415258720000 ms.1 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.map(RDD.scala:270) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:75) at org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at com.hp.tbda.rta.SubscriberClient$2.call(SubscriberClient.java:206) at com.hp.tbda.rta.SubscriberClient$2.call(SubscriberClient.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 20 more 
4
  • just a side note: Scala is much more concise and readable to work with Spark. Worth the learning effort. Commented Nov 6, 2014 at 14:49
  • Indeed, Scala is simpler than Java in terms of Spark coding standards. But the requirement needs to be fulfilled in Java. Thanks! Commented Nov 7, 2014 at 11:24
  • I have similar spark streaming application but my application crashes after 2 hours have you faced similar problem. Thanks @Navin Ahmed Commented Nov 26, 2014 at 10:32
  • No bro. Haven't faced that kind of an issue as of now. Commented Nov 28, 2014 at 3:52

1 Answer 1

1

SparkContext is not serializable as it's only usable on the driver and should NOT be included in any closure. I'm afraid that support for SQL on Spark Streaming is only at research level at the moment. See this presentation from the Spark Summit for the details.

To create the intended RDD of ids of male subscribers you can use map and filter:

maleSubscribers = subscribers.filter(subsc => subcs.getGender == "Male") .map(subsc => subsc.getMsisdn) 
Sign up to request clarification or add additional context in comments.

1 Comment

I was able to apply SQL on Streaming data. We are not supposed to have the declaration of JavaSQLContext within the foreachRDD. Had to do that outside the for loop. Thanks for the help though. Appreciate it

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.