0

SETUP: Spark: 3.2.3
DeltaSharing Test-Server running locally

I am writing and reading data into a Deltalake with Spark.

Now I like to enable CDF for being able to read only the changes permanently with using DeltaSharing. DeltaSharing without CDF works.

The loaded packages:

conf.set('spark.sql.extensions','io.delta.sql.DeltaSparkSessionExtension') conf.set('spark.sql.catalog.spark_catalog','org.apache.spark.sql.delta.catalog.DeltaCatalog') 

Then I register the S3-path to a table:

spark.sql(f"CREATE TABLE if not exists retail USING DELTA LOCATION '{table_path}'") spark.sql(f"ALTER TABLE retail SET TBLPROPERTIES(delta.enableChangeDataFeed = true)") 

Then after creating new data in a DataFrame and I append this data to the deltalake file.

df.write.option("header", True).mode("append").format("delta").save(table_path)) 

When I read the data with a separate job directly from the delta lake it works:

table_path = f"s3a://{'BUCKET']}{s3folder}/retail" df = (spark.read.format("delta") .option("readChangeFeed", "true") .option("startingVersion", "29") .load(table_path)) 

When using DeltaSharing it fails:

table_path = DS_PROFILE_FILE + '#' + 'PM.catalog.transactions' df = (spark.read.format("deltaSharing") .option("readChangeFeed", "true") .option("startingVersion", "29") .load(table_path)) 

with:

: io.delta.sharing.spark.util.UnexpectedHttpStatus: HTTP request failed with status: HTTP/1.1 400 Bad Request {"errorCode":"INVALID_PARAMETER_VALUE","message":"cdf is not enabled on table PM.catalog.transactions"}.

I am run out of ideas. Any suggestion?

2 Answers 2

1

To enable Change Data Feed (CDF) sharing via Delta Sharing, you need to ensure that both history sharing and CDF are enabled for the table on the Delta Sharing server.

cdfEnabled: true historyShared: true 

This is what the delta-sharing doc says:

If the table supports history sharing(tableConfig.cdfEnabled=true in the OSS Delta Sharing Server), the connector can query table changes.

Sign up to request clarification or add additional context in comments.

Comments

0

I finally found the solution in a delta.io blog

You need to tell the server configuration of delta-sharing as well that this table is CDF enabled:

shares: - name: "PM" schemas: - name: "catalog" tables: - name: "transactions" location: "s3a://<bucket>/<path>/transactions" cdfEnabled: true 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.