0

I'm trying to realize to-protobuf transformation inside Spark Streaming code, which read data from kafka topic.

Income dataframe:

 readStreamFromKafka(config).writeStream .foreachBatch { (kafkaBatch: DataFrame, batchId: Long) => kafkaBatch.printSchema() 

has following schema:

 root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) |-- headers: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- key: string (nullable = true) | | |-- value: binary (nullable = true) 

kafkaBatch.show(truncate = false) return not null "value":

+----+----------- ------+ |key |value .... | +----+----------- ------+ |null|[C0 E2 4F .... E2 4F | +----+----------- ------+ 

Logical plan is:

 StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ad7b8e9, KafkaV2[Subscribe[bg-intermediate]] 

But my code:

 kafkaBatch .select( from_avro( col("value"), readSchemaFromResource("/schemas/avro/schema1.avsc") ).as("value") ) .select( to_protobuf( col("value"), "extractor.api.grpc.face.v1.BestShotSource", "/schemas/proto/schema1.proto" ).as("new_value") ) .write .format("kafka") .... 

return error:

AnalysisException: Required attribute 'value' not found 

What am I doing wrong?

1
  • can you comment the second select and see if the error is coming from the first select or the second select ? Commented Apr 2 at 11:08

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.