0

I have to write a single output file for my streaming job.

Question : when will my job actually stop? I killed the server but did not work. I want to stop my job from commandline(If it is possible)

Code:

 import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds import org.apache.spark._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import scala.collection.mutable.ArrayBuffer object MAYUR_BELDAR_PROGRAM5_V1 { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(10)) val lines = ssc.socketTextStream("localhost", args(0).toInt) val words = lines.flatMap(_.split(" ")) val class1 = words.filter(a => a.charAt(0).toInt%2==0).map(a => a).filter(a => a.length%2==0) val class2 = words.filter(a => a.charAt(0).toInt%2==0).map(a => a).filter(a => a.length%2==1) val class3 = words.filter(a => a.charAt(0).toInt%2==1).map(a => a).filter(a => a.length%2==0) val class4 = words.filter(a => a.charAt(0).toInt%2==1).map(a => a).filter(a => a.length%2==1) class1.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class1","txt") class2.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class2", "txt") class3.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class3","txt") class4.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class4","txt") ssc.start() // Start the computation ssc.awaitTermination() ssc.stop() } } 
6
  • Unrelated, but what is the purpose of .map(a => a)? Commented Apr 13, 2017 at 21:59
  • @cricket_007 Please ignore that, I was just experimenting something. Commented Apr 13, 2017 at 22:00
  • I don't understand the problem. You are going to get cumulative output in each folder here for 30000 milliseconds Commented Apr 13, 2017 at 22:02
  • Spark doesn't write singular files unless you force one reducer... mbeldar/class1 is going to be a directory. Commented Apr 13, 2017 at 22:04
  • Yes you are right. But problem here is, I am getting class1_folder1, class1_folder2... something like this. And I just want single folder class1 Commented Apr 13, 2017 at 22:06

1 Answer 1

1

A stream by definition does not have an end so it will not stop unless you call the method to stop it. In my case I have a business condition that tell when the process is finished, so when I reach this point I'm calling the method JavaStreamingContext.close(). I also have a monitor that checks if the process has not received any data in the past few minutes in which case it will also close the stream.

In order to accumulate data you have to use the method updateStateByKey (on a PairDStream). This method requires checkpointing to be enabled.

I have checked the Spark code and found that saveAsTextFiles uses foreachRDD, so at the end it will save each RDD separately, so previous RDDs will not be taken into account. Using updateStateByKey it will still save multiple files, but each file will consider all RDDs that were processed before.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.