0

I am trying to write a Kafka consumer (of a protobuf) using structured streaming. Let's call the protobuf being A which should be deserialized as byte array (Array[Byte]) in Scala.

I tried all the methods I can find online but still could not figure out how to correctly parse message A

Method 1: From the integration guide (https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html) I should cast value as String. But even if I do getBytes to convert string to byte in order to parse my message A, I get:

Exception in thread "main" java.lang.ExceptionInInitializerError ... Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8 

Method 2: I want to convert value directly to byte array. I would get:

missing ')' at '['(line 1, pos 17) == SQL == CAST(key AS Array[Byte]) 

Method 3: I followed (https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html) to write my own deserializer of protobuf. But got error message:

Schema for type A is not supported 

The above three methods are probably all the methods I can find online. It should be a simple and common question, so if anyone have insight into it, please let me know.

Thanks!

1 Answer 1

1

The schema of the DataFrame created from the streaming source is:

root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) 

So the key and value are actually in Array[Byte]. You will have to perform deserialization in Dataframe operations.

For e.g I have this for Kafka deserialization:

 import sparkSession.implicits._ sparkSession.readStream .format("kafka") .option("subscribe", topic) .option( "kafka.bootstrap.servers", bootstrapServers ) .load() .selectExpr("key", "value") // Selecting only key & value .as[(Array[Byte], Array[Byte])] .flatMap { case (key, value) => for { deserializedKey <- Try { keyDeserializer.deserialize(topic, key) }.toOption deserializedValue <- Try { valueDeserializer.deserialize(topic, value) }.toOption } yield (deserializedKey, deserializedValue) } 

You will need to modify that to deserialize your protobuf records.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.