SETUP: Spark: 3.2.3
DeltaSharing Test-Server running locally
I am writing and reading data into a Deltalake with Spark.
Now I like to enable CDF for being able to read only the changes permanently with using DeltaSharing. DeltaSharing without CDF works.
The loaded packages:
conf.set('spark.sql.extensions','io.delta.sql.DeltaSparkSessionExtension') conf.set('spark.sql.catalog.spark_catalog','org.apache.spark.sql.delta.catalog.DeltaCatalog') Then I register the S3-path to a table:
spark.sql(f"CREATE TABLE if not exists retail USING DELTA LOCATION '{table_path}'") spark.sql(f"ALTER TABLE retail SET TBLPROPERTIES(delta.enableChangeDataFeed = true)") Then after creating new data in a DataFrame and I append this data to the deltalake file.
df.write.option("header", True).mode("append").format("delta").save(table_path)) When I read the data with a separate job directly from the delta lake it works:
table_path = f"s3a://{'BUCKET']}{s3folder}/retail" df = (spark.read.format("delta") .option("readChangeFeed", "true") .option("startingVersion", "29") .load(table_path)) When using DeltaSharing it fails:
table_path = DS_PROFILE_FILE + '#' + 'PM.catalog.transactions' df = (spark.read.format("deltaSharing") .option("readChangeFeed", "true") .option("startingVersion", "29") .load(table_path)) with:
: io.delta.sharing.spark.util.UnexpectedHttpStatus: HTTP request failed with status: HTTP/1.1 400 Bad Request {"errorCode":"INVALID_PARAMETER_VALUE","message":"cdf is not enabled on table PM.catalog.transactions"}.
I am run out of ideas. Any suggestion?