Skip to content

[FLINK-38962][connectors/opensearch] Add OpenSearch 3.x support with opensearch-java client#56

Open
uliandim wants to merge 1 commit intoapache:mainfrom
uliandim:main
Open

[FLINK-38962][connectors/opensearch] Add OpenSearch 3.x support with opensearch-java client#56
uliandim wants to merge 1 commit intoapache:mainfrom
uliandim:main

Conversation

@uliandim
Copy link
Copy Markdown

The new connector uses the official opensearch-java client instead of the deprecated rest-high-level-client, providing better compatibility with OpenSearch 3.x and future versions.

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg bot commented Jan 22, 2026

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@uliandim
Copy link
Copy Markdown
Author

Hi, I don't have a JIRA account. Could a committer please create a JIRA ticket for this OpenSearch 3.x support feature, or let me know if there's an existing one I should reference? I'll then update the PR title accordingly.

@uliandim uliandim changed the title Add OpenSearch 3.x support with opensearch-java client [hotfix][connectors/opensearch] Add OpenSearch 3.x support with opensearch-java client Jan 22, 2026
@uliandim uliandim changed the title [hotfix][connectors/opensearch] Add OpenSearch 3.x support with opensearch-java client [FLINK-38962] Add OpenSearch 3.x support with opensearch-java client Jan 22, 2026
@uliandim uliandim changed the title [FLINK-38962] Add OpenSearch 3.x support with opensearch-java client [FLINK-38962][connectors/opensearch] Add OpenSearch 3.x support with opensearch-java client Jan 22, 2026
Copy link
Copy Markdown
Member

@reta reta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @uliandim , this is largely adapted from flink-connector-elasticsearch, right? (no issues here, just the fact it is the existing code that already works) Thank you

@uliandim uliandim force-pushed the main branch 2 times, most recently from 488195d to 2deaf99 Compare February 4, 2026 15:26
@reta
Copy link
Copy Markdown
Member

reta commented Feb 15, 2026

@snuyanzin could you help us here please? the implementation is pretty much aligned with https://github.com/apache/flink-connector-elasticsearch, thank you

@@ -0,0 +1,5 @@
flink-sql-connector-opensearch3
Copyright 2014-2024 The Apache Software Foundation
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Copyright 2014-2024 The Apache Software Foundation
Copyright 2014-2026 The Apache Software Foundation

?

Comment on lines +126 to +139
<version>${httpclient5.version}</version>
</dependency>

<!-- Explicit httpcore5 to avoid version conflicts -->
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5-h2</artifactId>
<version>5.3.1</version>
</dependency>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally we have same version for all, should we reuse ${httpclient5.version} or have a separate property?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

httpclient5 and httpcore5 do evolve separately, there is no version alignment

<optional>true</optional>
</dependency>

<!-- Opensearch Java Client (new official client) -->
Copy link
Copy Markdown
Contributor

@snuyanzin snuyanzin Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is new?
after half a year/1 or 2 years will it be still new?
Or should we have a bit different comment here, may be with mentioning a major version


public void assertThatIdsAreWritten(String index, Integer... ids) throws Exception {
// Wait for documents to be indexed
Thread.sleep(2000);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sure it is enough, ideally we should avoid such timeouts...


@Override
public Long map(Long value) throws Exception {
Thread.sleep(50);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
}
indexGenerator.open();
objectMapper = new ObjectMapper();
Copy link
Copy Markdown
Contributor

@snuyanzin snuyanzin Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason we need to create it each time?
Can we instead having just one for all (static final) ?

Jackson javadoc says it should be ok if configuration is same https://github.com/FasterXML/jackson-databind/blob/16187a7ecf69fda4eacfab742ab74b07bd3d96d2/src/main/java/com/fasterxml/jackson/databind/ObjectMapper.java#L83-L88

Comment on lines +108 to +113
public Opensearch3SinkBuilder<IN> setHosts(HttpHost... hosts) {
checkNotNull(hosts);
checkState(hosts.length > 0, "Hosts cannot be empty.");
this.hosts = Arrays.asList(hosts);
return self();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand it correctly: right now it validates that

  1. hosts is not null
  2. hosts is not empty

how about case when hosts is not null and have 2 elements one of which is null and second is not null?

*/
public Opensearch3SinkBuilder<IN> setConnectionUsername(String username) {
checkNotNull(username);
this.username = username;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can user name be empty or blank?

* @param maxSizeMb the maximum size of buffered actions, in mb.
* @return this builder
*/
public Opensearch3SinkBuilder<IN> setBulkFlushMaxSizeMb(int maxSizeMb) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +59 to +75
private int bulkFlushMaxActions = 1000;
private int bulkFlushMaxMb = -1;
private long bulkFlushInterval = -1;
private FlushBackoffType bulkFlushBackoffType = FlushBackoffType.NONE;
private int bulkFlushBackoffRetries = -1;
private long bulkFlushBackOffDelay = -1;
private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE;
private List<HttpHost> hosts;
protected Opensearch3Emitter<? super IN> emitter;
private String username;
private String password;
private String connectionPathPrefix;
private Integer connectionTimeout;
private Integer connectionRequestTimeout;
private Integer socketTimeout;
private Boolean allowInsecure;
private Opensearch3FailureHandler failureHandler = new DefaultFailureHandler();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idea for follow up PR: I wonder if there should be a possibility to set these params via config options

@@ -0,0 +1,382 @@
<?xml version="1.0" encoding="UTF-8"?>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have dependency-reduced-pom.xml here?
it should be in .gitignore

@@ -0,0 +1,382 @@
<?xml version="1.0" encoding="UTF-8"?>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop this file, it should be in .gitignore

@@ -0,0 +1,382 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop this file, it should be in .gitignore

@snuyanzin
Copy link
Copy Markdown
Contributor

I put some minor comments
also it seems CI is not happy yet

@SubNader
Copy link
Copy Markdown

Hello everyone, we are very interested in this. Is there any blockers holding the release back?
Let me know if I can support.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment