3

I am using Autoloader with Schema Inference to automatically load some data into S3.

I have one column that is a Map which is overwhelming Autoloader (it tries to infer it as struct -> creating a struct with all keys as properties), so I just use a schema hint for that column.

My output data frame / Delta Table looks exactly as expected, so the schema hint works great in that regard.

The only problem that I am facing is that on schema inference the schema hint does not seem to be taken into account. The stage of the Spark job is very slow and the schema file that Autoloader produces is still insanely huge causing driver OOMs. Has anyone faced something similar before?

The code is very simple:

spark \ .readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.inferColumnTypes", "true") \ .option("cloudFiles.schemaHints", SCHEMA_HINT) \ .option("cloudFiles.schemaLocation", f"{target_s3_bucket}/_schema/{source_table_name}") \ .load(f"{source_s3_bucket}/{source_table_name}") \ .writeStream \ .trigger(availableNow=True) \ .format("delta") \ .option("mergeSchema", "true") \ .option("checkpointLocation", f"{target_s3_bucket}/_checkpoint/{source_table_name}") \ .option("streamName", source_table_name) \ .start(f"{target_s3_bucket}/{target_table_name}") 

Following works fine in Databricks notebook:

%sh BASE_DIR=/root S3_BASE='s3://my-bucket/kash/so_json_schema_hints/source_s3_bucket/input_json_files' mkdir -p $BASE_DIR/source_s3_bucket/input_json_files rm $BASE_DIR/source_s3_bucket/input_json_files/* i=0 while [[ $i -le 100 ]] do ((i++)) ((i++)) j=`echo $((i+1))` echo $BASE_DIR/source_s3_bucket/input_json_files/$i.json cat << EO_F > $BASE_DIR/source_s3_bucket/input_json_files/$i.json {"id": $i, "map_col": {"key$i": "val$i"}} {"id": $j, "map_col": {"key$j": "val$j"}} EO_F cat $BASE_DIR/source_s3_bucket/input_json_files/$i.json aws s3 cp $BASE_DIR/source_s3_bucket/input_json_files/$i.json $S3_BASE/$i.json sleep 20 done 
%python source_table_name = 'input_json_files' target_table_name = 'target_table_with_hints' S3_BASE='s3a://my-bucket/kash/so_json_schema_hints' df = ( spark .readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.schemaHints", 'id INT, map_col MAP<STRING,STRING>') .option("cloudFiles.schemaLocation", f"{S3_BASE}/target_s3_bucket/_schema/{target_table_name}") .load(f"{S3_BASE}/source_s3_bucket/{source_table_name}") ) df.printSchema() q = ( df.writeStream .trigger(availableNow=True) .format("delta") .option("mergeSchema", "true") .option("checkpointLocation", f"{S3_BASE}/target_s3_bucket/_checkpoint/{target_table_name}") .option("streamName", source_table_name) .start(f"{S3_BASE}/target_s3_bucket/{target_table_name}") ) q.awaitTermination() # prints: # # root # |-- id: integer (nullable = true) # |-- map_col: map (nullable = true) # | |-- key: string # | |-- value: string (valueContainsNull = true) # |-- _rescued_data: string (nullable = true) 

If you remove the line .option("cloudFiles.schemaHints", 'id INT, map_col MAP<STRING,STRING>') (you'll also have to change target_table_name = 'target_table_no_hints') then same code prints following:

# root # |-- id: long (nullable = true) # |-- map_col: struct (nullable = true) # | |-- key2: string (nullable = true) # | |-- key3: string (nullable = true) # | |-- key4: string (nullable = true) # | |-- key5: string (nullable = true) # | |-- key6: string (nullable = true) # | |-- key7: string (nullable = true) # | |-- key8: string (nullable = true) # | |-- key9: string (nullable = true) # |-- _rescued_data: string (nullable = true) 
6
  • schema inference requires a complete pass through the source data to guess "infer" the schema, Far better to define it manually, even if it requires some effort decision you Commented Jul 15, 2024 at 15:34
  • I don't want to enforce the schema, because it it could change later on. It is acceptable that inference is more expensive compute-wise, but what I don't understand is that schema hints don't seem to influence that. The schema file that Autoloader Schema Inference produces does not get influenced by schema hints. Commented Jul 15, 2024 at 19:24
  • @RobertKossendey, can you add some example? E.g. print the value of SCHEMA_HINT, some sample data would also help. Commented Jul 30, 2024 at 19:20
  • I added a working example, could you modify it to reproduce your problem? Commented Jul 30, 2024 at 21:26
  • This should reproduce my schema. Could you check the cloudFiles.schemaLocation file? It should contain the "non-schema-hint" schema. Commented Jul 31, 2024 at 6:46

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.