I'm trying to perform an aggregation followed by a self-join on a Structured Streaming DataFrame. Let's suppose the df looks like as follows:
sourceDf.show(false) +-----+-------+ |owner|fruits | +-----+-------+ |Brian|apple | |Brian|pear | |Brian|melon | |Brian|avocado| |Bob |avocado| |Bob |apple | +-----+-------+ On a static DataFrame, it is easy:
val aggDf = sourceDf.groupBy($"owner").agg(collect_list(col("fruits")) as "fruitsA") sourceDf.join(aggDf, Seq("owner")).show(false) +-----+-------+-----------------------------+ |owner|fruits |fruitsA | +-----+-------+-----------------------------+ |Brian|apple |[apple, pear, melon, avocado]| |Brian|pear |[apple, pear, melon, avocado]| |Brian|melon |[apple, pear, melon, avocado]| |Brian|avocado|[apple, pear, melon, avocado]| |Bob |avocado|[avocado, apple] | |Bob |apple |[avocado, apple] | +-----+-------+-----------------------------+ Unfortunately, I'm unable to figure out how to do this in the case of a Streaming DataFrame. So, I tried using the following complete code that uses Kafka for both Source and Sink:
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StringType, StructType} object Test { val spark: SparkSession = SparkSession.builder().getOrCreate() import spark.implicits._ val brokers = "kafka:9092" val inputTopic = "test.kafka.sink.input" val aggTopic = "test.kafka.sink.agg" val outputTopicSelf = "test.kafka.sink.output.self" val outputTopicTwo = "test.kafka.sink.output.two" val payloadSchema: StructType = new StructType() .add("owner", StringType) .add("fruits", StringType) val payloadSchemaA: StructType = new StructType() .add("owner", StringType) .add("fruitsA", StringType) var joinedDfSchema: StructType = _ val sourceDf: DataFrame = Seq( ("Brian", "apple"), ("Brian", "pear"), ("Brian", "melon"), ("Brian", "avocado"), ("Bob", "avocado"), ("Bob", "apple") ) .toDF("owner", "fruits") val additionalData: DataFrame = Seq(("Bob", "grapes")).toDF("owner", "fruits") def saveDfToKafka(df: DataFrame): Unit = { df .select(to_json(struct(df.columns.map(column): _*)).alias("value")) .write .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("topic", inputTopic) .save() } // save data to kafka (batch) saveDfToKafka(sourceDf) // kafka source val farmDF: DataFrame = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("startingOffsets", "earliest") .option("subscribe", inputTopic) .load() .byteArrayToString("value") .withColumn("value", from_json($"value", payloadSchema)) .expand("value") farmDF.printSchema() implicit class DFHelper(df: DataFrame) { def expand(column: String): DataFrame = { val wantedColumns = df.columns.filter(_ != column) :+ s"$column.*" df.select(wantedColumns.map(col): _*) } def byteArrayToString(column: String): DataFrame = { val selectedCols = df.columns.filter(_ != column) :+ s"CAST($column AS STRING)" df.selectExpr(selectedCols: _*) } } def testSelfAggJoinFail(): Unit = { // aggregated df val myFarmDF = farmDF .groupBy($"owner") .agg(collect_list(col("fruits")) as "fruitsA") // joined df val joinedDF = farmDF .join(myFarmDF.as("myFarmDF"), Seq("owner")) .select("owner", "fruits", "myFarmDF.fruitsA") joinedDfSchema = joinedDF.schema // stream sink joinedDF .select(to_json(struct(joinedDF.columns.map(column): _*)).alias("value")) .writeStream .outputMode("append") .option("kafka.bootstrap.servers", brokers) .option("checkpointLocation", "/data/kafka/checkpointSelf") .option("topic", outputTopicSelf) .format("kafka") .start() // let's give time to process the stream Thread.sleep(10000) } def testSelfAggJoin(): Unit = { // aggregated df val myFarmDF = farmDF .withWatermark("timestamp", "30 seconds") .groupBy( window($"timestamp", "30 seconds", "15 seconds"), $"owner" ) .agg(collect_list(col("fruits")) as "fruitsA") .select("owner", "fruitsA", "window") // joined df val joinedDF = farmDF .as("farmDF") .withWatermark("timestamp", "30 seconds") .join( myFarmDF.as("myFarmDF"), expr( """ |farmDF.owner = myFarmDF.owner AND |farmDF.timestamp >= myFarmDF.window.start AND |farmDF.timestamp <= myFarmDF.window.end """.stripMargin)) .select("farmDF.owner", "farmDF.fruits", "myFarmDF.fruitsA") joinedDfSchema = joinedDF.schema // stream sink joinedDF .select(to_json(struct(joinedDF.columns.map(column): _*)).alias("value")) .writeStream .outputMode("append") .option("kafka.bootstrap.servers", brokers) .option("checkpointLocation", "/data/kafka/checkpointSelf") .option("topic", outputTopicSelf) .format("kafka") .start() // let's give time to process the stream Thread.sleep(10000) } def testTwoDfAggJoin(): Unit = { // aggregated df val myFarmDF = farmDF .withWatermark("timestamp", "30 seconds") .groupBy( $"owner" ) .agg(collect_list(col("fruits")) as "fruitsA") .select("owner", "fruitsA") // save the aggregated df to kafka myFarmDF .select(to_json(struct(myFarmDF.columns.map(column):_*)).alias("value")) .writeStream .outputMode("update") .option("kafka.bootstrap.servers", brokers) .option("checkpointLocation", "/data/kafka/checkpointAgg") .option("topic", aggTopic) .format("kafka") .start() // let's give time to process the stream Thread.sleep(10000) // read the aggregated df from kafka as a stream val aggDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("startingOffsets", "earliest") .option("subscribe", aggTopic) .load() .byteArrayToString("value") .withColumn("value", from_json($"value", payloadSchemaA)) .expand("value") .withWatermark("timestamp", "30 seconds") // joined df val joinedDF = farmDF .as("farmDF") .join( aggDF.as("myFarmDF"), expr( """ |farmDF.owner = myFarmDF.owner AND |farmDF.timestamp >= myFarmDF.timestamp - interval 1 hour AND |farmDF.timestamp <= myFarmDF.timestamp + interval 1 hour """.stripMargin)) .select("farmDF.owner", "myFarmDF.fruitsA", "farmDF.fruits") joinedDfSchema = joinedDF.schema // stream sink joinedDF .select(to_json(struct(joinedDF.columns.map(column):_*)).alias("value")) .writeStream .outputMode("append") .option("kafka.bootstrap.servers", brokers) .option("checkpointLocation", "/data/kafka/checkpointTwo") .option("topic", outputTopicTwo) .format("kafka") .start() // let's give time to process the stream Thread.sleep(10000) } def data(topic: String): DataFrame = { // let's read back the output topic using kafka batch spark .read .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", topic) .load() .byteArrayToString("value") .withColumn("value", from_json($"value", joinedDfSchema)) .expand("value") } } Now, if I test on a Streaming DataFrame:
scala> Test.testSelfAggJoinFail org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;; Project [structstojson(named_struct(owner, owner#59, fruits, fruits#60, fruitsA, fruitsA#78), Some(Etc/UTC)) AS value#96] +- Project [owner#59, fruits#60, fruitsA#78] +- Project [owner#59, key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, fruits#60, fruitsA#78] +- Join Inner, (owner#59 = owner#82) :- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, value#51.owner AS owner#59, value#51.fruits AS fruits#60] : +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, jsontostructs(StructField(owner,StringType,true), StructField(fruits,StringType,true), value#43, Some(Etc/UTC), true) AS value#51] : +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, cast(value#30 as string) AS value#43] : +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3269e790, kafka, Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092), [key#29, value#30, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42eeb996,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#22, value#23, topic#24, partition#25, offset#26L, timestamp#27, timestampType#28] +- SubqueryAlias myFarmDF +- Aggregate [owner#82], [owner#82, collect_list(fruits#83, 0, 0) AS fruitsA#78] +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, value#51.owner AS owner#82, value#51.fruits AS fruits#83] +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, jsontostructs(StructField(owner,StringType,true), StructField(fruits,StringType,true), value#43, Some(Etc/UTC), true) AS value#51] +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, cast(value#30 as string) AS value#43] +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3269e790, kafka, Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092), [key#29, value#30, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42eeb996,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#22, value#23, topic#24, partition#25, offset#26L, timestamp#27, timestampType#28] at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:110) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:235) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296) at Test$.testSelfAggJoinFail(<console>:123) ... 51 elided it fails with Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark because I don't use watermarks.
Now, if I can run the second test with
Test.testSelfAggJoin I get these warnings
2018-09-12 16:07:33 WARN StreamingJoinHelper:66 - Failed to extract state value watermark from condition (window#70-T30000ms.start - timestamp#139-T30000ms) due to window#70-T30000ms.start 2018-09-12 16:07:33 WARN StreamingJoinHelper:66 - Failed to extract state value watermark from condition (timestamp#139-T30000ms - window#70-T30000ms.end) due to window#70-T30000ms.end 2018-09-12 16:07:33 WARN StreamingJoinHelper:66 - Failed to extract state value watermark from condition (window#70-T30000ms.start - timestamp#139-T30000ms) due to window#70-T30000ms.start 2018-09-12 16:07:33 WARN StreamingJoinHelper:66 - Failed to extract state value watermark from condition (timestamp#139-T30000ms - window#70-T30000ms.end) due to window#70-T30000ms.end And I can check the result with
Test.data(Test.outputTopicSelf).show(false) 2018-09-12 16:08:01 WARN NetworkClient:882 - [Consumer clientId=consumer-5, groupId=spark-kafka-relation-02f5512f-cc3c-40ad-938f-e3dfdca95f8c-driver-0] Error while fetching metadata with correlation id 2 : {test.kafka.sink .output.self=LEADER_NOT_AVAILABLE} 2018-09-12 16:08:01 WARN NetworkClient:882 - [Consumer clientId=consumer-5, groupId=spark-kafka-relation-02f5512f-cc3c-40ad-938f-e3dfdca95f8c-driver-0] Error while fetching metadata with correlation id 6 : {test.kafka.sink .output.self=LEADER_NOT_AVAILABLE} +---+-----+---------+------+---------+-------------+-----+------+-------+ |key|topic|partition|offset|timestamp|timestampType|owner|fruits|fruitsA| +---+-----+---------+------+---------+-------------+-----+------+-------+ +---+-----+---------+------+---------+-------------+-----+------+-------+ which returns an empty DataFrame (probably because of the warning?). I was unable to find a solution with a self-join.
Finally I tried by sinking the aggregation to Kafka and re-reading it as a second Streaming DataFrame, as in
scala> Test.data(Test.outputTopicTwo).show(false) +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ |key |topic |partition|offset|timestamp |timestampType|owner|fruitsA |fruits | +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ |null|test.kafka.sink.output.two|0 |0 |2018-09-12 16:57:04.376|0 |Brian|["avocado","apple","pear","melon"]|avocado| |null|test.kafka.sink.output.two|0 |1 |2018-09-12 16:57:04.376|0 |Bob |["apple","avocado"] |apple | |null|test.kafka.sink.output.two|0 |2 |2018-09-12 16:57:04.38 |0 |Brian|["avocado","apple","pear","melon"]|apple | |null|test.kafka.sink.output.two|0 |3 |2018-09-12 16:57:04.38 |0 |Bob |["apple","avocado"] |avocado| |null|test.kafka.sink.output.two|0 |4 |2018-09-12 16:57:04.381|0 |Brian|["avocado","apple","pear","melon"]|pear | |null|test.kafka.sink.output.two|0 |5 |2018-09-12 16:57:04.382|0 |Brian|["avocado","apple","pear","melon"]|melon | +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ which works (although not very efficiently, I'd say) but if I add additional data to the source topic:
scala> Test.saveDfToKafka(Test.additionalData) scala> Test.data(Test.outputTopicTwo).show(false) +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ |key |topic |partition|offset|timestamp |timestampType|owner|fruitsA |fruits | +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ |null|test.kafka.sink.output.two|0 |0 |2018-09-12 16:57:04.376|0 |Brian|["avocado","apple","pear","melon"]|avocado| |null|test.kafka.sink.output.two|0 |1 |2018-09-12 16:57:04.376|0 |Bob |["apple","avocado"] |apple | |null|test.kafka.sink.output.two|0 |2 |2018-09-12 16:57:04.38 |0 |Brian|["avocado","apple","pear","melon"]|apple | |null|test.kafka.sink.output.two|0 |3 |2018-09-12 16:57:04.38 |0 |Bob |["apple","avocado"] |avocado| |null|test.kafka.sink.output.two|0 |4 |2018-09-12 16:57:04.381|0 |Brian|["avocado","apple","pear","melon"]|pear | |null|test.kafka.sink.output.two|0 |5 |2018-09-12 16:57:04.382|0 |Brian|["avocado","apple","pear","melon"]|melon | |null|test.kafka.sink.output.two|0 |6 |2018-09-12 16:59:37.125|0 |Bob |["apple","avocado"] |grapes | |null|test.kafka.sink.output.two|0 |7 |2018-09-12 16:59:40.001|0 |Bob |["apple","avocado","grapes"] |apple | |null|test.kafka.sink.output.two|0 |8 |2018-09-12 16:59:40.002|0 |Bob |["apple","avocado","grapes"] |avocado| |null|test.kafka.sink.output.two|0 |9 |2018-09-12 16:59:40.002|0 |Bob |["apple","avocado","grapes"] |grapes | +----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+ I get many more rows, probably because I had to use .outputMode("update") while sinking the aggregation Df.
- Is there a way to perform this aggregation without sending the aggregation back to Kafka as a separate topic?
- If not, is it possible to modify
testTwoDfAggJointo use.outputMode("append")?
withColumn("timestamp", current_timestamp())and groupBy("timestamp", "owner"). In general,appendoutput mode with aggregations is not a recommended way. As far as I understand, multiphase processing may help in such kind of tasks. Hope it helps!withColumn("timestamp", current_timestamp())andgroupBy("timestamp", "owner")and the result is the same: empty Df. I also tried without the join to test the aggregation part and it's still empty so apparently the problem is in the aggregation part.Test.sourceDf.withColumn("timestamp", current_timestamp()).groupBy($"timestamp", $"owner").agg(collect_list(col("fruits")) as "fruitsA")works perfectly on the static Dfupdatemode I got output. I have strong doubts that you will be able to do it inappendmode. My understanding is thatappendmode only formap-likeoperations, e.g. filter, transform entry etc. I believe that multiphase processing, with save to Kafka and read back, is the best way for you. So answer for both your questions would be "No". Try to create another stream for aggregated data and group by time window,