0

I want to stream data from MS Defender lake houseusing autoloader. Data have this folder structure in Blob storage: y=2023/m=06/d=27/h=23/m=00 enter image description here

Problem is that into streaming are included even partition columns and there is column named "m" twice. Once for month and once for minute. I tried to explicitly select columns for stream read but this is not working. Any idea how to either omit partition columns or drop them before I get error?

My code:

bronze_df = (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("badRecordsPath", bad_record_path) .option("cloudFiles.schemaLocation", checkpoint_path) .load(f"{defender_adls}") .selectExpr("Tenant", "category", "operationName", "properties", "tenantId", "time") ) 

error message: [STREAM_FAILED] Query [id = f87a50cb-97f4-450f-a59c-b296109a21aa, runId = 68b2056e-767c-4df0-9935-c55f39d7c0e0] terminated with exception: [AMBIGUOUS_REFERENCE] Reference m is ambiguous, could be: [m, m].

1 Answer 1

1

I have tried the approach to Filter to drop a specific partition.

from pyspark.sql.functions import col df = spark.readStream.format('cloudFiles') \ .option('cloudFiles.format', 'CSV') \ .option('cloudFiles.schemaLocation', schema_loc) \ .option('header', True) \ .load(source_data_loc) df_filtered = df.filter((col('y') != '2023') & (col('m') != '06') & (col('d') != '27') & (col('h') != '23') & (col('m') != '00')) df_filtered.show() 

enter image description here

I have used CSV file as an example.

ADLS File Path: enter image description here

You can try the below approach

from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType source_data_loc = "abfss://[email protected]/actual_data_csv/y=2023/m=06/d=27/h=23/m=00" target_data_loc = "abfss://[email protected]/Autoloader/output.csv" checkpoint_data_loc = "abfss://[email protected]/checkpoints" schema_loc = "abfss://[email protected]/schema" spark = SparkSession.builder.appName("AzureStorageExample").getOrCreate() customSchema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ]) df = spark.readStream.format('cloudFiles') \ .option('cloudFiles.format', "CSV") \ .option('cloudFiles.schemaLocation', schema_loc) \ .option('header', True) \ .schema(customSchema) \ .load(source_data_loc) df = df.drop('year', 'month', 'day', 'hour', 'minute') df.display() 

enter image description here

enter image description here

  • In the above code defines a custom schema using StructType and StructField to specify the structure of the data.
  • And drops specific columns from the DataFrame, including 'year','month', 'day', 'hour', and 'minute'.
Sign up to request clarification or add additional context in comments.

1 Comment

Exactly, I needed to create schema and set it as option in the stream: json_schema = StructType([ StructField('properties', StructType([ StructField('AlertId', StringType(), True) ]))... bronze_df = (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", checkpoint_path) .option('cloudFiles.useIncrementalListing', 'true') .schema(json_schema) ...

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.