[FLINK-38962][connectors/opensearch] Add OpenSearch 3.x support with opensearch-java client#56
[FLINK-38962][connectors/opensearch] Add OpenSearch 3.x support with opensearch-java client#56uliandim wants to merge 1 commit intoapache:mainfrom
Conversation
| Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
| Hi, I don't have a JIRA account. Could a committer please create a JIRA ticket for this OpenSearch 3.x support feature, or let me know if there's an existing one I should reference? I'll then update the PR title accordingly. |
...rch3/src/main/java/org/apache/flink/connector/opensearch/sink/Opensearch3RequestIndexer.java Show resolved Hide resolved
488195d to 2deaf99 Compare ...-opensearch3/src/main/java/org/apache/flink/connector/opensearch/sink/Opensearch3Writer.java Outdated Show resolved Hide resolved
...or-opensearch3/src/main/java/org/apache/flink/connector/opensearch/sink/Opensearch3Sink.java Outdated Show resolved Hide resolved
| @snuyanzin could you help us here please? the implementation is pretty much aligned with https://github.com/apache/flink-connector-elasticsearch, thank you |
| @@ -0,0 +1,5 @@ | |||
| flink-sql-connector-opensearch3 | |||
| Copyright 2014-2024 The Apache Software Foundation | |||
There was a problem hiding this comment.
| Copyright 2014-2024 The Apache Software Foundation | |
| Copyright 2014-2026 The Apache Software Foundation |
?
| <version>${httpclient5.version}</version> | ||
| </dependency> | ||
| | ||
| <!-- Explicit httpcore5 to avoid version conflicts --> | ||
| <dependency> | ||
| <groupId>org.apache.httpcomponents.core5</groupId> | ||
| <artifactId>httpcore5</artifactId> | ||
| <version>5.3.1</version> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.httpcomponents.core5</groupId> | ||
| <artifactId>httpcore5-h2</artifactId> | ||
| <version>5.3.1</version> | ||
| </dependency> |
There was a problem hiding this comment.
finally we have same version for all, should we reuse ${httpclient5.version} or have a separate property?
There was a problem hiding this comment.
httpclient5 and httpcore5 do evolve separately, there is no version alignment
| <optional>true</optional> | ||
| </dependency> | ||
| | ||
| <!-- Opensearch Java Client (new official client) --> |
There was a problem hiding this comment.
what is new?
after half a year/1 or 2 years will it be still new?
Or should we have a bit different comment here, may be with mentioning a major version
| | ||
| public void assertThatIdsAreWritten(String index, Integer... ids) throws Exception { | ||
| // Wait for documents to be indexed | ||
| Thread.sleep(2000); |
There was a problem hiding this comment.
are we sure it is enough, ideally we should avoid such timeouts...
| | ||
| @Override | ||
| public Long map(Long value) throws Exception { | ||
| Thread.sleep(50); |
| throw new FlinkRuntimeException("Failed to initialize serialization schema.", e); | ||
| } | ||
| indexGenerator.open(); | ||
| objectMapper = new ObjectMapper(); |
There was a problem hiding this comment.
is there a reason we need to create it each time?
Can we instead having just one for all (static final) ?
Jackson javadoc says it should be ok if configuration is same https://github.com/FasterXML/jackson-databind/blob/16187a7ecf69fda4eacfab742ab74b07bd3d96d2/src/main/java/com/fasterxml/jackson/databind/ObjectMapper.java#L83-L88
| public Opensearch3SinkBuilder<IN> setHosts(HttpHost... hosts) { | ||
| checkNotNull(hosts); | ||
| checkState(hosts.length > 0, "Hosts cannot be empty."); | ||
| this.hosts = Arrays.asList(hosts); | ||
| return self(); | ||
| } |
There was a problem hiding this comment.
Do I understand it correctly: right now it validates that
hostsis not nullhostsis not empty
how about case when hosts is not null and have 2 elements one of which is null and second is not null?
| */ | ||
| public Opensearch3SinkBuilder<IN> setConnectionUsername(String username) { | ||
| checkNotNull(username); | ||
| this.username = username; |
There was a problem hiding this comment.
can user name be empty or blank?
| * @param maxSizeMb the maximum size of buffered actions, in mb. | ||
| * @return this builder | ||
| */ | ||
| public Opensearch3SinkBuilder<IN> setBulkFlushMaxSizeMb(int maxSizeMb) { |
There was a problem hiding this comment.
should we use something like memoryUnit here https://github.com/apache/flink/blob/4d4c30933543fb9796a941c356cb01ee84903da9/flink-core-api/src/main/java/org/apache/flink/configuration/MemorySize.java#L358-L363
?
| private int bulkFlushMaxActions = 1000; | ||
| private int bulkFlushMaxMb = -1; | ||
| private long bulkFlushInterval = -1; | ||
| private FlushBackoffType bulkFlushBackoffType = FlushBackoffType.NONE; | ||
| private int bulkFlushBackoffRetries = -1; | ||
| private long bulkFlushBackOffDelay = -1; | ||
| private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE; | ||
| private List<HttpHost> hosts; | ||
| protected Opensearch3Emitter<? super IN> emitter; | ||
| private String username; | ||
| private String password; | ||
| private String connectionPathPrefix; | ||
| private Integer connectionTimeout; | ||
| private Integer connectionRequestTimeout; | ||
| private Integer socketTimeout; | ||
| private Boolean allowInsecure; | ||
| private Opensearch3FailureHandler failureHandler = new DefaultFailureHandler(); |
There was a problem hiding this comment.
idea for follow up PR: I wonder if there should be a possibility to set these params via config options
| @@ -0,0 +1,382 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | |||
There was a problem hiding this comment.
why do we have dependency-reduced-pom.xml here?
it should be in .gitignore
| @@ -0,0 +1,382 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | |||
There was a problem hiding this comment.
drop this file, it should be in .gitignore
| @@ -0,0 +1,382 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | |||
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> | |||
There was a problem hiding this comment.
drop this file, it should be in .gitignore
| I put some minor comments |
| Hello everyone, we are very interested in this. Is there any blockers holding the release back? |
The new connector uses the official opensearch-java client instead of the deprecated rest-high-level-client, providing better compatibility with OpenSearch 3.x and future versions.