Is there a way to combine these two examples without using
stream.foreachRDD(rdd => {})?
Not yet. Spark 2.0.0 doesn't have Kafka sink support for Structured Streaming. This is a feature that should come out in Spark 2.1.0 according to Tathagata Das, one of the creators of Spark Streaming. Here is the relevant JIRA issue.
##Edit: (29/11/2018)
Edit: (29/11/2018)
Yes, It's possible with Spark version 2.2 onwards.
stream .writeStream // use `write` for batch, like DataFrame .format("kafka") .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2") .option("topic", "target-topic1") .start() Check this SO post(read and write on Kafka topic with Spark streaming) for more.
Edit: (06/12/2016)
Kafka 0.10 integration for Structured Streaming is now expiramentaly supported in Spark 2.0.2:
val ds1 = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() ds1 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]