[FLINK-29549]- Flink Glue Catalog integration#206
[FLINK-29549]- Flink Glue Catalog integration#206fmorillo7694 wants to merge 16 commits intoapache:mainfrom
Conversation
Co-Authored-By: Anthony Pounds-Cornish <antpc@amazon.co.uk>
refactoring directory Adding Parent Pom
Samrat002 left a comment
There was a problem hiding this comment.
Thanks for the contribution. A few points to note:
It looks like most of the code has already been reviewed across multiple PRs.
Concerns:
-
The current implementation exposes low-level Glue details (e.g., support for lowercase table names) directly through the Flink Catalog. Is this acceptable? I recommend raising this topic with the community. In my opinion, it would be better to encapsulate such Glue-specific behavior and avoid exposing it directly via the Flink Catalog interface. The catalog's behavior should remain consistent with other catalogs, with differences controlled via configuration only.
-
Please consider implementing the basic configuration options defined in FLIP-277. If that’s not feasible in this PR, a fast follow-up would be valuable, especially for users relying on different credential modes.
-
It seems that some
.ideafolder files have been committed. Please remove them from the PR.
Cheers,
Samrat
Hey Samrat. How could we encapsulate this specific issues? From user perspective we are already limiting them creating tables and databases with uppercase. In regards of the schema, we encapsulate storing the actual original columnNames in column parameters in Glue, so even though user sees their schema in glue with lower case (default for glue). we actually leverage the original column name.
|
Thanks for the detailed response. Can you help me understand the technical complexity to support CaseSensitivity from GlueCatalog? This is a deviation.
We can start a thread in the community to discuss this approach. I am fine with either of the approaches as long as the community agrees on it. |
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java Show resolved Hide resolved
| */ | ||
| @Override | ||
| public void close() throws CatalogException { | ||
| if (glueClient != null) { |
There was a problem hiding this comment.
Is there a scenario where glueClient would be null?
There was a problem hiding this comment.
glueClient is created lazily.
i think in the following scenario client can be null
- Create a catalog
- Never fired any DDL or DML SQL command
- Close the catalog or shell
this guard unexpected failure NullPointerException
| int maxRetries = 3; | ||
| int retryCount = 0; | ||
| long retryDelayMs = 200; | ||
| while (retryCount < maxRetries) { |
There was a problem hiding this comment.
Curious on what is the motivation for retries here? Is there a specific RuntimeException that we have in mind?
I had a look at AWS sdk, the close is best-effort and logs in case of any exceptions: https://github.com/aws/aws-sdk-java-v2/blob/master/utils/src/main/java/software/amazon/awssdk/utils/IoUtils.java#L76-L85
We can simplify code here by removing retry logic as sdk does not seem to be surfacing exceptions during close().
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java Show resolved Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java Show resolved Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java Outdated Show resolved Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java Outdated Show resolved Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java Show resolved Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java Outdated Show resolved Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java Outdated Show resolved Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java Show resolved Hide resolved
| .tableList(); | ||
| | ||
| // Filter tables to only include those that are of type VIEW | ||
| List<String> viewNames = allTables.stream() |
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java Outdated Show resolved Hide resolved
| @Samrat002 @FranMorilloAWS about case sensitivity, these are the logical rules I suggest:
I would add that the way the additional metadata (the original name and any additional info required) is stored in Glue should be clearly documented, in case a user want's to build their own external interface to extract or update information in the Glue Catalog. |
Hello @Samrat002 @nicusX. Using Database Parameters and Table Parameters we are now able to use lower/upper case for defining the Database and Table Name. By using the Show Tables/Show Databases, Describe table commands, we will show the original Flink Definition, even though in Glue UI it will be all in lower case |
| Thanks for incorporating the changes. I will review the pr in next couple of days . |
Samrat002 left a comment
There was a problem hiding this comment.
Patch looks great. Thank you for making the change.
minor comments
Cheers,
Samrat
| # AWS Glue Catalog | ||
| | ||
| The AWS Glue Catalog provides a way to use [AWS Glue](https://aws.amazon.com/glue) as a catalog for Apache Flink. | ||
| This allows users to access Glue's metadata store directly from Flink SQL and Table API. |
There was a problem hiding this comment.
In my understanding, the DataStream API can also utilize the Glue Catalog. Is that not true?
| Before getting started, ensure you have the following: | ||
| | ||
| - **AWS account** with appropriate permissions for AWS Glue and other required services | ||
| - **AWS credentials** properly configured |
There was a problem hiding this comment.
Can you add a simlink to How to configure from AWS documentation ?
| String region = config.get(REGION.key()); | ||
| String defaultDatabase = config.getOrDefault(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue()); | ||
| | ||
| // Ensure required properties are present |
| GetUserDefinedFunctionsRequest.Builder functionsRequest = | ||
| GetUserDefinedFunctionsRequest.builder() | ||
| .databaseName(databaseName); | ||
| List<String> glueFunctions; |
There was a problem hiding this comment.
If there is no function present, then glueFunctions will be a null object. This might be an issue.
can you add
| List<String> glueFunctions; | |
| List<String> glueFunctions = new ArrayList<>(); |
| * @param glueClient The Glue client used for interacting with the AWS Glue service. | ||
| * @param catalogName The catalog name associated with the Glue operations. | ||
| */ | ||
| protected GlueOperator(GlueClient glueClient, String catalogName) { |
There was a problem hiding this comment.
Add a null check on input params glueClient
| Can we refer to Paimon's solution for integrating Glue? https://issues.apache.org/jira/browse/FLINK-38457 @FranMorilloAWS |
| @melin. To me knowledge Paimon integration with Glue Data Catalog is to store Paimon Tables in Glue, however it wouldnt work for storing Kinesis, MSK, and other streaming sources. They have different implementations. |
- Remove scala.binary.version property (Flink 2.0 dropped Scala deps) - Replace flink-table-planner_2.12 with flink-table-planner-loader - Migrate CatalogTable.of() to CatalogTable.newBuilder() API (removed in Flink 2.0) - All 120 tests pass
Purpose of the change
For example: Implements the Table API for the Kinesis Source.
Verifying this change
Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
@Public(Evolving))