Skip to main content
Notice removed Authoritative reference needed by CommunityBot
Bounty Ended with no winning answer by CommunityBot
added 2634 characters in body
Source Link
Kashyap
  • 17.9k
  • 14
  • 78
  • 126

Following works fine in Databricks notebook:

%sh BASE_DIR=/root S3_BASE='s3://my-bucket/kash/so_json_schema_hints/source_s3_bucket/input_json_files' mkdir -p $BASE_DIR/source_s3_bucket/input_json_files rm $BASE_DIR/source_s3_bucket/input_json_files/* i=0 while [[ $i -le 100 ]] do ((i++)) ((i++)) j=`echo $((i+1))` echo $BASE_DIR/source_s3_bucket/input_json_files/$i.json cat << EO_F > $BASE_DIR/source_s3_bucket/input_json_files/$i.json {"id": $i, "map_col": {"key$i": "val$i"}} {"id": $j, "map_col": {"key$j": "val$j"}} EO_F cat $BASE_DIR/source_s3_bucket/input_json_files/$i.json aws s3 cp $BASE_DIR/source_s3_bucket/input_json_files/$i.json $S3_BASE/$i.json sleep 20 done 
%python source_table_name = 'input_json_files' target_table_name = 'target_table_with_hints' S3_BASE='s3a://my-bucket/kash/so_json_schema_hints' df = ( spark .readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.schemaHints", 'id INT, map_col MAP<STRING,STRING>') .option("cloudFiles.schemaLocation", f"{S3_BASE}/target_s3_bucket/_schema/{target_table_name}") .load(f"{S3_BASE}/source_s3_bucket/{source_table_name}") ) df.printSchema() q = ( df.writeStream .trigger(availableNow=True) .format("delta") .option("mergeSchema", "true") .option("checkpointLocation", f"{S3_BASE}/target_s3_bucket/_checkpoint/{target_table_name}") .option("streamName", source_table_name) .start(f"{S3_BASE}/target_s3_bucket/{target_table_name}") ) q.awaitTermination() # prints: # # root # |-- id: integer (nullable = true) # |-- map_col: map (nullable = true) # | |-- key: string # | |-- value: string (valueContainsNull = true) # |-- _rescued_data: string (nullable = true) 

If you remove the line .option("cloudFiles.schemaHints", 'id INT, map_col MAP<STRING,STRING>') (you'll also have to change target_table_name = 'target_table_no_hints') then same code prints following:

# root # |-- id: long (nullable = true) # |-- map_col: struct (nullable = true) # | |-- key2: string (nullable = true) # | |-- key3: string (nullable = true) # | |-- key4: string (nullable = true) # | |-- key5: string (nullable = true) # | |-- key6: string (nullable = true) # | |-- key7: string (nullable = true) # | |-- key8: string (nullable = true) # | |-- key9: string (nullable = true) # |-- _rescued_data: string (nullable = true) 

Following works fine in Databricks notebook:

%sh BASE_DIR=/root S3_BASE='s3://my-bucket/kash/so_json_schema_hints/source_s3_bucket/input_json_files' mkdir -p $BASE_DIR/source_s3_bucket/input_json_files rm $BASE_DIR/source_s3_bucket/input_json_files/* i=0 while [[ $i -le 100 ]] do ((i++)) ((i++)) j=`echo $((i+1))` echo $BASE_DIR/source_s3_bucket/input_json_files/$i.json cat << EO_F > $BASE_DIR/source_s3_bucket/input_json_files/$i.json {"id": $i, "map_col": {"key$i": "val$i"}} {"id": $j, "map_col": {"key$j": "val$j"}} EO_F cat $BASE_DIR/source_s3_bucket/input_json_files/$i.json aws s3 cp $BASE_DIR/source_s3_bucket/input_json_files/$i.json $S3_BASE/$i.json sleep 20 done 
%python source_table_name = 'input_json_files' target_table_name = 'target_table_with_hints' S3_BASE='s3a://my-bucket/kash/so_json_schema_hints' df = ( spark .readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.schemaHints", 'id INT, map_col MAP<STRING,STRING>') .option("cloudFiles.schemaLocation", f"{S3_BASE}/target_s3_bucket/_schema/{target_table_name}") .load(f"{S3_BASE}/source_s3_bucket/{source_table_name}") ) df.printSchema() q = ( df.writeStream .trigger(availableNow=True) .format("delta") .option("mergeSchema", "true") .option("checkpointLocation", f"{S3_BASE}/target_s3_bucket/_checkpoint/{target_table_name}") .option("streamName", source_table_name) .start(f"{S3_BASE}/target_s3_bucket/{target_table_name}") ) q.awaitTermination() # prints: # # root # |-- id: integer (nullable = true) # |-- map_col: map (nullable = true) # | |-- key: string # | |-- value: string (valueContainsNull = true) # |-- _rescued_data: string (nullable = true) 

If you remove the line .option("cloudFiles.schemaHints", 'id INT, map_col MAP<STRING,STRING>') (you'll also have to change target_table_name = 'target_table_no_hints') then same code prints following:

# root # |-- id: long (nullable = true) # |-- map_col: struct (nullable = true) # | |-- key2: string (nullable = true) # | |-- key3: string (nullable = true) # | |-- key4: string (nullable = true) # | |-- key5: string (nullable = true) # | |-- key6: string (nullable = true) # | |-- key7: string (nullable = true) # | |-- key8: string (nullable = true) # | |-- key9: string (nullable = true) # |-- _rescued_data: string (nullable = true) 
Notice added Authoritative reference needed by Robert Kossendey
Bounty Started worth 250 reputation by Robert Kossendey
Notice removed Authoritative reference needed by CommunityBot
Bounty Ended with no winning answer by CommunityBot

My output data frame / Delta Table looks exactly as expected, so the schema hint works great in that regard.

spark \ .readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.inferColumnTypes", "true") \ .option("cloudFiles.schemaHints", SCHEMA_HINT) \ .option("cloudFiles.schemaLocation", f"{target_s3_bucket}/_schema/{source_table_name}") \ .load(f"{source_s3_bucket}/{source_table_name}") \ .writeStream \ .trigger(availableNow=True) \ .format("delta") \ .option("mergeSchema", "true") \ .option("checkpointLocation", f"{target_s3_bucket}/_checkpoint/{source_table_name}") \ .option("streamName", source_table_name) \ .start(f"{target_s3_bucket}/{target_table_name}") 
spark \ .readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.inferColumnTypes", "true") \ .option("cloudFiles.schemaHints", SCHEMA_HINT) \ .option("cloudFiles.schemaLocation", f"{target_s3_bucket}/_schema/{source_table_name}") \ .load(f"{source_s3_bucket}/{source_table_name}") \ .writeStream \ .trigger(availableNow=True) \ .format("delta") \ .option("mergeSchema", "true") \ .option("checkpointLocation", f"{target_s3_bucket}/_checkpoint/{source_table_name}") \ .option("streamName", source_table_name) \ .start(f"{target_s3_bucket}/{target_table_name}") 

My output data frame / Delta Table looks exactly as expected, so schema hint works great in that regard.

spark \ .readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.inferColumnTypes", "true") \ .option("cloudFiles.schemaHints", SCHEMA_HINT) \ .option("cloudFiles.schemaLocation", f"{target_s3_bucket}/_schema/{source_table_name}") \ .load(f"{source_s3_bucket}/{source_table_name}") \ .writeStream \ .trigger(availableNow=True) \ .format("delta") \ .option("mergeSchema", "true") \ .option("checkpointLocation", f"{target_s3_bucket}/_checkpoint/{source_table_name}") \ .option("streamName", source_table_name) \ .start(f"{target_s3_bucket}/{target_table_name}") 

My output data frame / Delta Table looks exactly as expected, so the schema hint works great in that regard.

spark \ .readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.inferColumnTypes", "true") \ .option("cloudFiles.schemaHints", SCHEMA_HINT) \ .option("cloudFiles.schemaLocation", f"{target_s3_bucket}/_schema/{source_table_name}") \ .load(f"{source_s3_bucket}/{source_table_name}") \ .writeStream \ .trigger(availableNow=True) \ .format("delta") \ .option("mergeSchema", "true") \ .option("checkpointLocation", f"{target_s3_bucket}/_checkpoint/{source_table_name}") \ .option("streamName", source_table_name) \ .start(f"{target_s3_bucket}/{target_table_name}") 
Notice added Authoritative reference needed by Robert Kossendey
Bounty Started worth 50 reputation by Robert Kossendey
Source Link
Robert Kossendey
  • 7.1k
  • 2
  • 19
  • 49

Databricks Autoloader Schema Hint are not taken into consideration in schema file

I am using Autoloader with Schema Inference to automatically load some data into S3.

I have one column that is a Map which is overwhelming Autoloader (it tries to infer it as struct -> creating a struct with all keys as properties), so I just use a schema hint for that column.

My output data frame / Delta Table looks exactly as expected, so schema hint works great in that regard.

The only problem that I am facing is that on schema inference the schema hint does not seem to be taken into account. The stage of the Spark job is very slow and the schema file that Autoloader produces is still insanely huge causing driver OOMs. Has anyone faced something similar before?

The code is very simple:

spark \ .readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.inferColumnTypes", "true") \ .option("cloudFiles.schemaHints", SCHEMA_HINT) \ .option("cloudFiles.schemaLocation", f"{target_s3_bucket}/_schema/{source_table_name}") \ .load(f"{source_s3_bucket}/{source_table_name}") \ .writeStream \ .trigger(availableNow=True) \ .format("delta") \ .option("mergeSchema", "true") \ .option("checkpointLocation", f"{target_s3_bucket}/_checkpoint/{source_table_name}") \ .option("streamName", source_table_name) \ .start(f"{target_s3_bucket}/{target_table_name}")