0

I am using Databricks Autoloader with PySpark to stream Parquet files into a Delta table. Here's a simplified version of what I am doing:

spark.readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "parquet") \ .load("path") \ .writeStream \ .format("delta") \ .outputMode("append") \ .toTable("my_table") 

What I want to ensure is that every ingested file has the exact same column names and order as the target Delta table (my_table). This is to avoid scenarios where, for example, column values are written into incorrect columns due to schema mismatches.

I know that .schema(...) can be used on readStream, but this seems to enforce a static schema — whereas I want to validate the schema of each incoming file dynamically and reject any file that does not match.

I was hoping to use .foreachBatch(...) to perform per-batch validation logic before writing to the table, but .foreachBatch() is not available on .readStream(). Is there a way to validate incoming file schema (names and order) before writing with Autoloader?

If I could use Autoloader to understand which files are next to be loaded maybe I can check the parquet header files automatically without moving the Autoloader index forward?

1 Answer 1

0
#Create schema target_schema = spark.table("my_table").schema #Validation from pyspark.sql.functions import lit def validate_and_write(batch_df, batch_id): incoming_schema = batch_df.schema # Compare schema (field names and order) if incoming_schema == target_schema: batch_df.write.format("delta").mode("append").saveAsTable("my_table") else: # Log or store invalid schema information print(f"Schema mismatch in batch {batch_id}") # Optionally write bad records to quarantine or audit location batch_df.withColumn("error", lit("Schema Mismatch")).write.format("delta").mode("append").save("/tmp/quarantine") # Or skip the batch altogether (just return) spark.readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "parquet") \ .load("path") \ .foreachBatch(validate_and_write) \ .start() 
Sign up to request clarification or add additional context in comments.

2 Comments

Here is the documenetation that it supports foreachbatch() - docs.databricks.com/aws/en/structured-streaming/foreach
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.