5,587 questions
1 vote
1 answer
48 views
Limited in defining Spark time Window
I have a code of pyspark streaming. Which is in following: parsed_df = df.selectExpr("CAST(value AS STRING) as message", "timestamp") \ .select( from_json(col("...
0 votes
0 answers
73 views
Latency too high for a spark structured streaming job
I am running a structured streaming workflow in Databricks that is reading data stream from Kinesis. And then I am writing this data to external delta table (on s3). This workflow is running 4 streams ...
0 votes
0 answers
86 views
Spark query plan takes 5+mins to flatten 1000+ columns in spark structured streaming
I have a spark Dataframe of about ~60 columns. There are multi level structs in this schema, I have to flatten this dataframe which flattens it close ~1500 columns. The flattening logic is typical to ...
1 vote
0 answers
96 views
PySpark Autoloader: How to enforce schema and fail on mismatch?
Hi all I am using Databricks Autoloader with PySpark to ingest Parquet files from a directory. Here's a simplified version of my current setup: spark.readStream \ .format("cloudFiles") \ ....
0 votes
0 answers
50 views
Backfill (streaming) jobs
I have Spark notebooks for ingesting data from queue(kafka etc) to the bronze layer tables(for instance lh_bronze.order_delta) in the lakehouse. These notebooks are ingesting cdc data. for backfill(...
0 votes
0 answers
28 views
Spark Streaming error: Required attribute 'value' not found
I'm trying to realize to-protobuf transformation inside Spark Streaming code, which read data from kafka topic. Income dataframe: readStreamFromKafka(config).writeStream .foreachBatch { (...
0 votes
0 answers
19 views
Setup dynamic allocation for a spark job which is having data rate about 450k
How to setup dynamic allocation for a spark job which is having data rate about 450k? I tried with the below configurations, but the executor pods are always running with the max executors and it's ...
0 votes
0 answers
164 views
why is databricks autoloader failing to merge new columns with schema evolution
I'm using databricks autoloader to load parquet files def run_autoloader(table_name, checkpoint_path, latest_file_location, new_columns): # Configure Auto Loader to ingest parquet data to a Delta ...
0 votes
1 answer
79 views
Stream-stream LeftOuter join is not supported without a watermark in the join keys
I have the bewlow code that fails when Im attampting to do the stream stream left outer joins. @dlt.view def vw_ix_f_activity_gold(): return ( spark.readStream .option("...
1 vote
0 answers
51 views
Read mongodb document as a string/text using databricks readstream/writestream
I'm trying to read documents from mongodb Into databricks using spark structured streaming. I'm using to_json() to convert whole document to string. When using this, the schema evolution is working ...
0 votes
0 answers
49 views
[Spark-stream]: Stream Batches processing time reduce over time causing Kafka Lag
I have been using Spark v3.5 Spark Stream functionality for the below use case. I am observing the issue below on one of the environments with Spark Stream. Please if I can get some assistance with ...
0 votes
0 answers
150 views
Improve Latency with Delta Live Tables
Use Case: I am loading the Bronze layer using an external tool, which automatically creates bronze Delta tables in Databricks. However, after the initial load, I need to manually enable changeDataFeed ...
-1 votes
1 answer
190 views
Does a Spark dataframe read from a delta table auto-refresh?
I've inherited some Spark code and am having trouble understanding "why does this even work". Short-short is that I load a DF with info from a delta table, and then join that to a streaming ...
0 votes
1 answer
101 views
Delta live tables - cant update
Objective I plan to use Delta Live Tables (DLT) to deliver near real-time reporting in Power BI. Current Setup I load Bronze Delta tables every 1 minute using Fivetran. These Bronze tables serve as ...
0 votes
1 answer
143 views
Delta Live Tables - Source data for the APPLY CHANGES must be a streaming query
Use Case I am ingesting data using Fivetran, which syncs data from an Oracle database directly into my Databricks table. Fivetran manages the creation, updates, and inserts on these tables. As a ...