5

I'm trying to perform an aggregation followed by a self-join on a Structured Streaming DataFrame. Let's suppose the df looks like as follows:

sourceDf.show(false) +-----+-------+ |owner|fruits | +-----+-------+ |Brian|apple | |Brian|pear | |Brian|melon | |Brian|avocado| |Bob |avocado| |Bob |apple | +-----+-------+ 

On a static DataFrame, it is easy:

val aggDf = sourceDf.groupBy($"owner").agg(collect_list(col("fruits")) as "fruitsA") sourceDf.join(aggDf, Seq("owner")).show(false) +-----+-------+-----------------------------+ |owner|fruits |fruitsA | +-----+-------+-----------------------------+ |Brian|apple |[apple, pear, melon, avocado]| |Brian|pear |[apple, pear, melon, avocado]| |Brian|melon |[apple, pear, melon, avocado]| |Brian|avocado|[apple, pear, melon, avocado]| |Bob |avocado|[avocado, apple] | |Bob |apple |[avocado, apple] | +-----+-------+-----------------------------+ 

Unfortunately, I'm unable to figure out how to do this in the case of a Streaming DataFrame. So, I tried using the following complete code that uses Kafka for both Source and Sink:

import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StringType, StructType} object Test { val spark: SparkSession = SparkSession.builder().getOrCreate() import spark.implicits._ val brokers = "kafka:9092" val inputTopic = "test.kafka.sink.input" val aggTopic = "test.kafka.sink.agg" val outputTopicSelf = "test.kafka.sink.output.self" val outputTopicTwo = "test.kafka.sink.output.two" val payloadSchema: StructType = new StructType() .add("owner", StringType) .add("fruits", StringType) val payloadSchemaA: StructType = new StructType() .add("owner", StringType) .add("fruitsA", StringType) var joinedDfSchema: StructType = _ val sourceDf: DataFrame = Seq( ("Brian", "apple"), ("Brian", "pear"), ("Brian", "melon"), ("Brian", "avocado"), ("Bob", "avocado"), ("Bob", "apple") ) .toDF("owner", "fruits") val additionalData: DataFrame = Seq(("Bob", "grapes")).toDF("owner", "fruits") def saveDfToKafka(df: DataFrame): Unit = { df .select(to_json(struct(df.columns.map(column): _*)).alias("value")) .write .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("topic", inputTopic) .save() } // save data to kafka (batch) saveDfToKafka(sourceDf) // kafka source val farmDF: DataFrame = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("startingOffsets", "earliest") .option("subscribe", inputTopic) .load() .byteArrayToString("value") .withColumn("value", from_json($"value", payloadSchema)) .expand("value") farmDF.printSchema() implicit class DFHelper(df: DataFrame) { def expand(column: String): DataFrame = { val wantedColumns = df.columns.filter(_ != column) :+ s"$column.*" df.select(wantedColumns.map(col): _*) } def byteArrayToString(column: String): DataFrame = { val selectedCols = df.columns.filter(_ != column) :+ s"CAST($column AS STRING)" df.selectExpr(selectedCols: _*) } } def testSelfAggJoinFail(): Unit = { // aggregated df val myFarmDF = farmDF .groupBy($"owner") .agg(collect_list(col("fruits")) as "fruitsA") // joined df val joinedDF = farmDF .join(myFarmDF.as("myFarmDF"), Seq("owner")) .select("owner", "fruits", "myFarmDF.fruitsA") joinedDfSchema = joinedDF.schema // stream sink joinedDF .select(to_json(struct(joinedDF.columns.map(column): _*)).alias("value")) .writeStream .outputMode("append") .option("kafka.bootstrap.servers", brokers) .option("checkpointLocation", "/data/kafka/checkpointSelf") .option("topic", outputTopicSelf) .format("kafka") .start() // let's give time to process the stream Thread.sleep(10000) } def testSelfAggJoin(): Unit = { // aggregated df val myFarmDF = farmDF .withWatermark("timestamp", "30 seconds") .groupBy( window($"timestamp", "30 seconds", "15 seconds"), $"owner" ) .agg(collect_list(col("fruits")) as "fruitsA") .select("owner", "fruitsA", "window") // joined df val joinedDF = farmDF .as("farmDF") .withWatermark("timestamp", "30 seconds") .join( myFarmDF.as("myFarmDF"), expr( """ |farmDF.owner = myFarmDF.owner AND |farmDF.timestamp >= myFarmDF.window.start AND |farmDF.timestamp <= myFarmDF.window.end """.stripMargin)) .select("farmDF.owner", "farmDF.fruits", "myFarmDF.fruitsA") joinedDfSchema = joinedDF.schema // stream sink joinedDF .select(to_json(struct(joinedDF.columns.map(column): _*)).alias("value")) .writeStream .outputMode("append") .option("kafka.bootstrap.servers", brokers) .option("checkpointLocation", "/data/kafka/checkpointSelf") .option("topic", outputTopicSelf) .format("kafka") .start() // let's give time to process the stream Thread.sleep(10000) } def testTwoDfAggJoin(): Unit = { // aggregated df val myFarmDF = farmDF .withWatermark("timestamp", "30 seconds") .groupBy( $"owner" ) .agg(collect_list(col("fruits")) as "fruitsA") .select("owner", "fruitsA") // save the aggregated df to kafka myFarmDF .select(to_json(struct(myFarmDF.columns.map(column):_*)).alias("value")) .writeStream .outputMode("update") .option("kafka.bootstrap.servers", brokers) .option("checkpointLocation", "/data/kafka/checkpointAgg") .option("topic", aggTopic) .format("kafka") .start() // let's give time to process the stream Thread.sleep(10000) // read the aggregated df from kafka as a stream val aggDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("startingOffsets", "earliest") .option("subscribe", aggTopic) .load() .byteArrayToString("value") .withColumn("value", from_json($"value", payloadSchemaA)) .expand("value") .withWatermark("timestamp", "30 seconds") // joined df val joinedDF = farmDF .as("farmDF") .join( aggDF.as("myFarmDF"), expr( """ |farmDF.owner = myFarmDF.owner AND |farmDF.timestamp >= myFarmDF.timestamp - interval 1 hour AND |farmDF.timestamp <= myFarmDF.timestamp + interval 1 hour """.stripMargin)) .select("farmDF.owner", "myFarmDF.fruitsA", "farmDF.fruits") joinedDfSchema = joinedDF.schema // stream sink joinedDF .select(to_json(struct(joinedDF.columns.map(column):_*)).alias("value")) .writeStream .outputMode("append") .option("kafka.bootstrap.servers", brokers) .option("checkpointLocation", "/data/kafka/checkpointTwo") .option("topic", outputTopicTwo) .format("kafka") .start() // let's give time to process the stream Thread.sleep(10000) } def data(topic: String): DataFrame = { // let's read back the output topic using kafka batch spark .read .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", topic) .load() .byteArrayToString("value") .withColumn("value", from_json($"value", joinedDfSchema)) .expand("value") } } 

Now, if I test on a Streaming DataFrame:

scala> Test.testSelfAggJoinFail org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;; Project [structstojson(named_struct(owner, owner#59, fruits, fruits#60, fruitsA, fruitsA#78), Some(Etc/UTC)) AS value#96] +- Project [owner#59, fruits#60, fruitsA#78] +- Project [owner#59, key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, fruits#60, fruitsA#78] +- Join Inner, (owner#59 = owner#82) :- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, value#51.owner AS owner#59, value#51.fruits AS fruits#60] : +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, jsontostructs(StructField(owner,StringType,true), StructField(fruits,StringType,true), value#43, Some(Etc/UTC), true) AS value#51] : +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, cast(value#30 as string) AS value#43] : +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3269e790, kafka, Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092), [key#29, value#30, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42eeb996,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#22, value#23, topic#24, partition#25, offset#26L, timestamp#27, timestampType#28] +- SubqueryAlias myFarmDF +- Aggregate [owner#82], [owner#82, collect_list(fruits#83, 0, 0) AS fruitsA#78] +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, value#51.owner AS owner#82, value#51.fruits AS fruits#83] +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, jsontostructs(StructField(owner,StringType,true), StructField(fruits,StringType,true), value#43, Some(Etc/UTC), true) AS value#51] +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, cast(value#30 as string) AS value#43] +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3269e790, kafka, Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092), [key#29, value#30, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42eeb996,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#22, value#23, topic#24, partition#25, offset#26L, timestamp#27, timestampType#28] at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:110) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:235) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296) at Test$.testSelfAggJoinFail(<console>:123) ... 51 elided 

it fails with Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark because I don't use watermarks.

Now, if I can run the second test with

Test.testSelfAggJoin 

I get these warnings

2018-09-12 16:07:33 WARN StreamingJoinHelper:66 - Failed to extract state value watermark from condition (window#70-T30000ms.start - timestamp#139-T30000ms) due to window#70-T30000ms.start 2018-09-12 16:07:33 WARN StreamingJoinHelper:66 - Failed to extract state value watermark from condition (timestamp#139-T30000ms - window#70-T30000ms.end) due to window#70-T30000ms.end 2018-09-12 16:07:33 WARN StreamingJoinHelper:66 - Failed to extract state value watermark from condition (window#70-T30000ms.start - timestamp#139-T30000ms) due to window#70-T30000ms.start 2018-09-12 16:07:33 WARN StreamingJoinHelper:66 - Failed to extract state value watermark from condition (timestamp#139-T30000ms - window#70-T30000ms.end) due to window#70-T30000ms.end 

And I can check the result with

Test.data(Test.outputTopicSelf).show(false) 2018-09-12 16:08:01 WARN NetworkClient:882 - [Consumer clientId=consumer-5, groupId=spark-kafka-relation-02f5512f-cc3c-40ad-938f-e3dfdca95f8c-driver-0] Error while fetching metadata with correlation id 2 : {test.kafka.sink .output.self=LEADER_NOT_AVAILABLE} 2018-09-12 16:08:01 WARN NetworkClient:882 - [Consumer clientId=consumer-5, groupId=spark-kafka-relation-02f5512f-cc3c-40ad-938f-e3dfdca95f8c-driver-0] Error while fetching metadata with correlation id 6 : {test.kafka.sink .output.self=LEADER_NOT_AVAILABLE} +---+-----+---------+------+---------+-------------+-----+------+-------+ |key|topic|partition|offset|timestamp|timestampType|owner|fruits|fruitsA| +---+-----+---------+------+---------+-------------+-----+------+-------+ +---+-----+---------+------+---------+-------------+-----+------+-------+ 

which returns an empty DataFrame (probably because of the warning?). I was unable to find a solution with a self-join.

Finally I tried by sinking the aggregation to Kafka and re-reading it as a second Streaming DataFrame, as in

scala> Test.data(Test.outputTopicTwo).show(false) +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ |key |topic |partition|offset|timestamp |timestampType|owner|fruitsA |fruits | +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ |null|test.kafka.sink.output.two|0 |0 |2018-09-12 16:57:04.376|0 |Brian|["avocado","apple","pear","melon"]|avocado| |null|test.kafka.sink.output.two|0 |1 |2018-09-12 16:57:04.376|0 |Bob |["apple","avocado"] |apple | |null|test.kafka.sink.output.two|0 |2 |2018-09-12 16:57:04.38 |0 |Brian|["avocado","apple","pear","melon"]|apple | |null|test.kafka.sink.output.two|0 |3 |2018-09-12 16:57:04.38 |0 |Bob |["apple","avocado"] |avocado| |null|test.kafka.sink.output.two|0 |4 |2018-09-12 16:57:04.381|0 |Brian|["avocado","apple","pear","melon"]|pear | |null|test.kafka.sink.output.two|0 |5 |2018-09-12 16:57:04.382|0 |Brian|["avocado","apple","pear","melon"]|melon | +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ 

which works (although not very efficiently, I'd say) but if I add additional data to the source topic:

scala> Test.saveDfToKafka(Test.additionalData) scala> Test.data(Test.outputTopicTwo).show(false) +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ |key |topic |partition|offset|timestamp |timestampType|owner|fruitsA |fruits | +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ |null|test.kafka.sink.output.two|0 |0 |2018-09-12 16:57:04.376|0 |Brian|["avocado","apple","pear","melon"]|avocado| |null|test.kafka.sink.output.two|0 |1 |2018-09-12 16:57:04.376|0 |Bob |["apple","avocado"] |apple | |null|test.kafka.sink.output.two|0 |2 |2018-09-12 16:57:04.38 |0 |Brian|["avocado","apple","pear","melon"]|apple | |null|test.kafka.sink.output.two|0 |3 |2018-09-12 16:57:04.38 |0 |Bob |["apple","avocado"] |avocado| |null|test.kafka.sink.output.two|0 |4 |2018-09-12 16:57:04.381|0 |Brian|["avocado","apple","pear","melon"]|pear | |null|test.kafka.sink.output.two|0 |5 |2018-09-12 16:57:04.382|0 |Brian|["avocado","apple","pear","melon"]|melon | |null|test.kafka.sink.output.two|0 |6 |2018-09-12 16:59:37.125|0 |Bob |["apple","avocado"] |grapes | |null|test.kafka.sink.output.two|0 |7 |2018-09-12 16:59:40.001|0 |Bob |["apple","avocado","grapes"] |apple | |null|test.kafka.sink.output.two|0 |8 |2018-09-12 16:59:40.002|0 |Bob |["apple","avocado","grapes"] |avocado| |null|test.kafka.sink.output.two|0 |9 |2018-09-12 16:59:40.002|0 |Bob |["apple","avocado","grapes"] |grapes | +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ 

I get many more rows, probably because I had to use .outputMode("update") while sinking the aggregation Df.

  • Is there a way to perform this aggregation without sending the aggregation back to Kafka as a separate topic?
  • If not, is it possible to modify testTwoDfAggJoin to use .outputMode("append")?
4
  • My understanding, you should use the same timestamp column for watermark and groupBy, otherwise it shows unsupported error. Maybe try to add withColumn("timestamp", current_timestamp()) and groupBy("timestamp", "owner"). In general, append output mode with aggregations is not a recommended way. As far as I understand, multiphase processing may help in such kind of tasks. Hope it helps! Commented Sep 12, 2018 at 17:34
  • I tried with withColumn("timestamp", current_timestamp()) and groupBy("timestamp", "owner") and the result is the same: empty Df. I also tried without the join to test the aggregation part and it's still empty so apparently the problem is in the aggregation part. Test.sourceDf.withColumn("timestamp", current_timestamp()).groupBy($"timestamp", $"owner").agg(collect_list(col("fruits")) as "fruitsA") works perfectly on the static Df Commented Sep 13, 2018 at 10:52
  • Yeah, I had the same empty output in my case with different aggregation. Once I changed it to update mode I got output. I have strong doubts that you will be able to do it in append mode. My understanding is that append mode only for map-like operations, e.g. filter, transform entry etc. I believe that multiphase processing, with save to Kafka and read back, is the best way for you. So answer for both your questions would be "No". Try to create another stream for aggregated data and group by time window, Commented Sep 13, 2018 at 16:25
  • is there any work around for this ? Commented Jan 19, 2019 at 21:44

2 Answers 2

3

As of Spark 2.3, Join of two streaming DF is not possible when there are some aggregate functions involved before join.

From the spark documentation

Additional details on supported joins: Joins can be cascaded, that is, you can do df1.join(df2, ...).join(df3, ...).join(df4, ....). As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported. As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used. Cannot use streaming aggregations before joins. Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins. 
Sign up to request clarification or add additional context in comments.

Comments

0

I encountered the similar error info, outputMode is important to agg, I solved by adding df.writeStream.outputMode(OutputMode.Update()) or df.writeStream.outputMode(OutputMode.Complete())

Ref:

Output Modes There are a few types of output modes.

Append mode (default) - This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once (assuming fault-tolerant sink). For example, queries with only select, where, map, flatMap, filter, join, etc. will support Append mode.

Complete mode - The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.

Update mode - (Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.

http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-3/

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.