0

I have the bewlow code that fails when Im attampting to do the stream stream left outer joins.

@dlt.view def vw_ix_f_activity_gold(): return ( spark.readStream .option("readChangeFeed", "true") .table("lakehouse_poc.poc_streaming.activity_silver") .alias("ACT") # Join with Oracle Activity Data .join( spark.readStream.table("lakehouse_poc.poc_streaming.ix_d_activity_gold") .withWatermark("_fivetran_synced", "5 seconds") .alias("DAC"), F.col("ACT.activity_seq") == F.col("DAC.BK_activity_seq"), "left" ) # Select and rename columns .select( ... ) dlt.create_streaming_table( name = "ix_f_activity_gold", ) dlt.apply_changes( target = "ix_f_activity_gold", source = "vw_ix_f_activity_gold", keys = ["BK_activity_seq"], sequence_by = "_fivetran_synced", stored_as_scd_type = 1 ) 

not sure why this would have failed, i do have the watermarks. Do I also need to mention in apply_changes part too?

2
  • 1
    In your code, you have applied a watermark to the ix_d_activity_gold stream but not to the activity_silver stream. Both streams need watermarks for the join to work correctly. Commented Feb 28 at 10:12
  • did not fix it the issue. still same error Commented Feb 28 at 10:36

1 Answer 1

0

Even though you’ve added a watermark in the vw_ix_f_activity_gold view, apply_changes may require an explicit expect_any_change_for parameter to ensure it correctly processes late-arriving data.

dlt.apply_changes( target="ix_f_activity_gold", source="vw_ix_f_activity_gold", keys=["BK_activity_seq"], sequence_by="_fivetran_synced", stored_as_scd_type=1, expect_any_change_for="5 seconds" # Align with your watermark setting ) 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.