I'm trying to realize to-protobuf transformation inside Spark Streaming code, which read data from kafka topic.
Income dataframe:
readStreamFromKafka(config).writeStream .foreachBatch { (kafkaBatch: DataFrame, batchId: Long) => kafkaBatch.printSchema() has following schema:
root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) |-- headers: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- key: string (nullable = true) | | |-- value: binary (nullable = true) kafkaBatch.show(truncate = false) return not null "value":
+----+----------- ------+ |key |value .... | +----+----------- ------+ |null|[C0 E2 4F .... E2 4F | +----+----------- ------+ Logical plan is:
StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ad7b8e9, KafkaV2[Subscribe[bg-intermediate]] But my code:
kafkaBatch .select( from_avro( col("value"), readSchemaFromResource("/schemas/avro/schema1.avsc") ).as("value") ) .select( to_protobuf( col("value"), "extractor.api.grpc.face.v1.BestShotSource", "/schemas/proto/schema1.proto" ).as("new_value") ) .write .format("kafka") .... return error:
AnalysisException: Required attribute 'value' not found What am I doing wrong?