1

I am stuck with this stream leftOuter join. I am able to stream join 2 data frames and can get null values after watermark time expires. But I am not able to achieve if I join with 3 data frames. Lets say for ex, I have 3 data frame read from Kafka( session, comparison , filter). My expectation is - After watermark expires if there is no data coming in comparison or filter stream, it should give out values for session. But for some reason that session is getting lost, even after I waited 10 more minutes I could not see this session on console!

I could not get any blog with people joining with 3 data streams. Please help me find the issue, or is there limitation spark structured streaming 3 or more stream data frame join? Let me know if you faced any similar issue!

below is code I am trying,

watermark_duration = "5 minutes" for all 3 streams interval_value = "5 minutes" session_df = spark.readStream.format("kafka").options(**options).option("subscribe", session_source_topic).load() session_df = session_df.selectExpr("CAST(value AS STRING) as session_raw_data", "Timestamp as session_Timestamp").withColumn("session_data", split("session_raw_data", "\\|")).selectExpr("session_data", "session_Timestamp") session_df = session_df.select(col("session_data")[0].alias("id"),\ col("session_data")[1].alias("session_id"),\ col("session_Timestamp")).withWatermark("session_Timestamp", watermark_duration) comparison_df = spark.readStream.format("kafka").options(**options).option("subscribe", comparison_source_topic).load() comparison_df = comparison_df.selectExpr("CAST(value AS STRING) as comparison_raw_data", "Timestamp as comparison_Timestamp").withColumn("comparison_data", split("comparison_raw_data", "\\|")).selectExpr("comparison_data", "comparison_Timestamp") comparison_df = comparison_df.select(col("comparison_data")[0].alias("comparison_id"),\ col("comparison_data")[1].alias("comparison_session_id"),\ col("comparison_Timestamp")).withWatermark("watermark_duration", comparison_watermark) filter_df = spark.readStream.format("kafka").options(**options).option("subscribe", filter_source_topic).load() filter_df = filter_df.selectExpr("CAST(value AS STRING) as filter_raw_data", "Timestamp as filter_Timestamp_Timestamp").withColumn("filter_data", split("filter_raw_data", "\\|")).selectExpr("filter_data", "filter_Timestamp_Timestamp") filter_df = filter_df.select(col("filter_data")[0].alias("filter_id"),\ col("filter_data")[1].alias("filter_session_id"),\ col("filter_Timestamp_Timestamp")).withWatermark("filter_Timestamp_Timestamp", watermark_duration) comparison_join_expr = "session_id=comparison_session_id AND " + \ " comparison_Timestamp between session_Timestamp and session_Timestamp + interval " + interval_value filter_join_expr = "session_id=filter_session_id AND " + \ " filter_Timestamp_Timestamp between session_Timestamp and session_Timestamp + interval " + interval_value joined_df1 = session_df.join(comparison_df, expr(comparison_join_expr), "leftOuter")\ .join(filter_df, expr(filter_join_expr), "leftOuter").drop("filter_session_id") # For testing session_df = session_df.writeStream.outputMode("append").format("console").start() comparison_df = comparison_df.writeStream.outputMode("append").format("console").start() filter_df = filter_df.writeStream.outputMode("append").format("console").start() joined_df1 = joined_df1.writeStream.outputMode("append").format("console").start() joined_df2 = joined_df2.writeStream.outputMode("append").format("console").start() joined_df3 = joined_df3.writeStream.outputMode("append").format("console").start() spark.streams.awaitAnyTermination() 

My expectation is if data not available in comparison_df and filter_df , it should print session_df + null values for comparison_df + null values for filter_df. Added image too!

Expected output

2 Answers 2

0

Currently, Structured Streaming only supports one stateful operator per pipeline. Multiple Stateful Operators in Structured Streaming are not supported yet. Refer this JIRA link https://issues.apache.org/jira/browse/SPARK-39585

There are 2 solutions to tackle this :

  1. Move the joins inside foreachbatch (Increase batch window and Interval in join condition). Although this will become stateless solution but outer join should give you NULL for right table
  2. Move slow moving right streams to Static storage (eg. Hudi) and perform chained joins. This will again be stateless solution but will solve the issue

Ankur

Sign up to request clarification or add additional context in comments.

Comments

0

I am able to achieve the output with approach-1. Attaching the screenshot for your reference

Ankurenter image description here

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.