11

I came across Structured Streaming with Spark, it has an example of continuously consuming from an S3 bucket and writing processed results to a MySQL DB.

// Read data continuously from an S3 location val inputDF = spark.readStream.json("s3://logs") // Do operations using the standard DataFrame API and write to MySQL inputDF.groupBy($"action", window($"time", "1 hour")).count() .writeStream.format("jdbc") .start("jdbc:mysql//...") 

How can this be used with Spark Kafka Streaming?

val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) 

Is there a way to combine these two examples without using stream.foreachRDD(rdd => {})?

2 Answers 2

12

Is there a way to combine these two examples without using stream.foreachRDD(rdd => {})?

Not yet. Spark 2.0.0 doesn't have Kafka sink support for Structured Streaming. This is a feature that should come out in Spark 2.1.0 according to Tathagata Das, one of the creators of Spark Streaming. Here is the relevant JIRA issue.

Edit: (29/11/2018)

Yes, It's possible with Spark version 2.2 onwards.

stream .writeStream // use `write` for batch, like DataFrame .format("kafka") .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2") .option("topic", "target-topic1") .start() 

Check this SO post(read and write on Kafka topic with Spark streaming) for more.

Edit: (06/12/2016)

Kafka 0.10 integration for Structured Streaming is now expiramentaly supported in Spark 2.0.2:

val ds1 = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() ds1 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] 
Sign up to request clarification or add additional context in comments.

3 Comments

is there a way to track the progress feature? E.g. a Jira story, a feature request, etc.?
@ike_love Yes, you can find it here
Please refer to the following Documentation for (Kafka)Alpha release : spark.apache.org/docs/latest/…
3

I was having a similar issue w.r.t reading from Kafka source and writing to a Cassandra sink. Created a simple project here kafka2spark2cassandra, sharing in case it could be helpful for anyone.

1 Comment

@Sokia - your project worked great, clean and self contained. Thanks.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.