1

I am running a kafka stream reader on spark

Following are dependencies

<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.0.1</version> </dependency> </dependencies> 

When some data say 'Hi----3' is produced to kafka topic, Getting the following exception (I can see the data though in exception) --

Serialization stack: - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = q_metrics, partition = 0, offset = 26, CreateTime = 1480588636828, checksum = 3939660770, serialized key size = -1, serialized value size = 9, key = null, value = "Hi----3")) 

I am not doing any computation on rdd (as that was also throwing same exception). Even stream.print() is also throwing the exception

Following is the code

import org.apache.spark.streaming._ import org.apache.spark.SparkContext import org.apache.spark.streaming.kafka010._ import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.Subscribe import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.rdd.RDD class Metrics { def readKafka() { val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)) val topics = Array("q_metrics") val sc = new SparkContext("local[4]", "ScalaKafkaConsumer") val streamingContext = new StreamingContext(sc, Seconds(10)) val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) stream.print() streamingContext.start streamingContext.awaitTermination() } def rddReader(rdd: Array[String]) = { } } object MetricsReader { def main(args: Array[String]): Unit = { val objMetrics = new Metrics() objMetrics.readKafka() } } 

Appreciate any help.

Thanks

6
  • Are you able to receive the message on kafka consumer console? Commented Dec 1, 2016 at 10:58
  • no. I see that message as part of that exception. Commented Dec 1, 2016 at 11:01
  • I think you need to add some jars to your kafka/lib location like metrics-core and kafka-clients if not there. Commented Dec 1, 2016 at 11:05
  • Can you provide the list of your kafka/lib jars list? Commented Dec 1, 2016 at 11:05
  • both metrics-core-3.1.2.jar, kafka-clients-0.10.0.1.jar are there in classpath....cant past the whole set, as it is huge Commented Dec 1, 2016 at 11:36

1 Answer 1

3

Found the issue, we cannot print directly as 'print' calls the ConsumerRecord. So I used map to get the record , collected the key value then printed

stream.foreachRDD { rdd => val collected = rdd.map(record => ( record.key(), record.value() )).collect() for ( c <- collected ) { println(c) } } 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.