I am using Autoloader with Schema Inference to automatically load some data into S3.
I have one column that is a Map which is overwhelming Autoloader (it tries to infer it as struct -> creating a struct with all keys as properties), so I just use a schema hint for that column.
My output data frame / Delta Table looks exactly as expected, so the schema hint works great in that regard.
The only problem that I am facing is that on schema inference the schema hint does not seem to be taken into account. The stage of the Spark job is very slow and the schema file that Autoloader produces is still insanely huge causing driver OOMs. Has anyone faced something similar before?
The code is very simple:
spark \ .readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.inferColumnTypes", "true") \ .option("cloudFiles.schemaHints", SCHEMA_HINT) \ .option("cloudFiles.schemaLocation", f"{target_s3_bucket}/_schema/{source_table_name}") \ .load(f"{source_s3_bucket}/{source_table_name}") \ .writeStream \ .trigger(availableNow=True) \ .format("delta") \ .option("mergeSchema", "true") \ .option("checkpointLocation", f"{target_s3_bucket}/_checkpoint/{source_table_name}") \ .option("streamName", source_table_name) \ .start(f"{target_s3_bucket}/{target_table_name}") Following works fine in Databricks notebook:
%sh BASE_DIR=/root S3_BASE='s3://my-bucket/kash/so_json_schema_hints/source_s3_bucket/input_json_files' mkdir -p $BASE_DIR/source_s3_bucket/input_json_files rm $BASE_DIR/source_s3_bucket/input_json_files/* i=0 while [[ $i -le 100 ]] do ((i++)) ((i++)) j=`echo $((i+1))` echo $BASE_DIR/source_s3_bucket/input_json_files/$i.json cat << EO_F > $BASE_DIR/source_s3_bucket/input_json_files/$i.json {"id": $i, "map_col": {"key$i": "val$i"}} {"id": $j, "map_col": {"key$j": "val$j"}} EO_F cat $BASE_DIR/source_s3_bucket/input_json_files/$i.json aws s3 cp $BASE_DIR/source_s3_bucket/input_json_files/$i.json $S3_BASE/$i.json sleep 20 done %python source_table_name = 'input_json_files' target_table_name = 'target_table_with_hints' S3_BASE='s3a://my-bucket/kash/so_json_schema_hints' df = ( spark .readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.schemaHints", 'id INT, map_col MAP<STRING,STRING>') .option("cloudFiles.schemaLocation", f"{S3_BASE}/target_s3_bucket/_schema/{target_table_name}") .load(f"{S3_BASE}/source_s3_bucket/{source_table_name}") ) df.printSchema() q = ( df.writeStream .trigger(availableNow=True) .format("delta") .option("mergeSchema", "true") .option("checkpointLocation", f"{S3_BASE}/target_s3_bucket/_checkpoint/{target_table_name}") .option("streamName", source_table_name) .start(f"{S3_BASE}/target_s3_bucket/{target_table_name}") ) q.awaitTermination() # prints: # # root # |-- id: integer (nullable = true) # |-- map_col: map (nullable = true) # | |-- key: string # | |-- value: string (valueContainsNull = true) # |-- _rescued_data: string (nullable = true) If you remove the line .option("cloudFiles.schemaHints", 'id INT, map_col MAP<STRING,STRING>') (you'll also have to change target_table_name = 'target_table_no_hints') then same code prints following:
# root # |-- id: long (nullable = true) # |-- map_col: struct (nullable = true) # | |-- key2: string (nullable = true) # | |-- key3: string (nullable = true) # | |-- key4: string (nullable = true) # | |-- key5: string (nullable = true) # | |-- key6: string (nullable = true) # | |-- key7: string (nullable = true) # | |-- key8: string (nullable = true) # | |-- key9: string (nullable = true) # |-- _rescued_data: string (nullable = true)
SCHEMA_HINT, some sample data would also help.