0

I am currently running a small application that periodically polls data from my DB and then puts it in a Kafka topic. While running the application code independently, when I comment my Kafka sink, the application runs correctly:

KafkaSink<String> stringSink = KafkaSinkFactory.createStringKafkaSink(props, kafkaProps, props.getProperty("asset.sync.kafka.sink.topic.name")); dbStream .map(Object::toString) // or map your custom object to String .sinkTo(stringSink) .name("String Kafka Sink"); 

But running both the sink and the source together gives the exception:

java.sql.SQLRecoverableException: IO Error: Socket read interrupted, Authentication lapse 0 ms. at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:874) at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793) at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57) at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747) at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562) at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:138) at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:364) at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206) at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:476) at com.zaxxer.hikari.pool.HikariPool.checkFailFast(HikariPool.java:561) at com.zaxxer.hikari.pool.HikariPool.<init>(HikariPool.java:115) at com.zaxxer.hikari.HikariDataSource.<init>(HikariDataSource.java:81) at ***.***.hcmp.config.DBConfig.getDataSource(DBConfig.java:22) at ***.***.hcmp.source.assetMaster.DBPollingSource.open(DBPollingSource.java:62) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:101) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.IOException: Socket read interrupted, Authentication lapse 0 ms. at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:870) ... 25 common frames omitted Caused by: java.io.InterruptedIOException: Socket read interrupted at oracle.net.nt.TimeoutSocketChannel.handleInterrupt(TimeoutSocketChannel.java:258) at oracle.net.nt.TimeoutSocketChannel.read(TimeoutSocketChannel.java:180) at oracle.net.ns.NSProtocolNIO.doSocketRead(NSProtocolNIO.java:555) at oracle.net.ns.NIOPacket.readNIOPacket(NIOPacket.java:403) at oracle.net.ns.NSProtocolNIO.negotiateConnection(NSProtocolNIO.java:127) at oracle.net.ns.NSProtocol.connect(NSProtocol.java:340) at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1596) at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:588) 

Here is the application code:

public class RefreshPollingApplication { private static final Logger log = LoggerFactory.getLogger(RefreshPollingApplication.class); public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromPropertiesFile( RefreshPollingApplication.class.getClassLoader().getResourceAsStream("application-local.properties") ); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, Time.of(10, TimeUnit.SECONDS) )); try{ Properties props = ConfigLoader.load("application-local.properties"); Properties kafkaProps = getKafkaProperties(props); log.info("@@@@@@@@@@@@@ Properties loaded"); SimpleDBConfig dbConfig = new SimpleDBConfig(params.get("flink.jdbc.url"), params.get("flink.jdbc.username"), params.get("flink.jdbc.password")); Connection connection = dbConfig.getConnection(); DataStream<String> dbStream = env.addSource( new JdbcPollingSource() ).setParallelism(1).name("DB Polling Source"); KafkaSink<String> stringSink = KafkaSinkFactory.createStringKafkaSink(props, kafkaProps, props.getProperty("****.***.kafka.sink.topic.name")); dbStream .map(Object::toString) .sinkTo(stringSink) .name("String Kafka Sink"); env.execute("Refresh polling job."); }catch (Exception e){ log.error("@@@@@@@@ERROR OCCURRED ON POLLING TASK, REASON: {}", e.getMessage()); throw e; } } } 

Source function class:

public class JdbcPollingSource extends RichSourceFunction<String> { private volatile boolean isRunning = true; private transient Connection connection; private transient PreparedStatement stmt; private String url; private String userName; private String password; @Override public void open(Configuration parameters) throws Exception { Class.forName("oracle.jdbc.OracleDriver"); super.open(parameters); Map<String, String> globalParams = getRuntimeContext() .getExecutionConfig() .getGlobalJobParameters() .toMap(); ParameterTool params = ParameterTool.fromMap(globalParams); this.url = params.get("flink.jdbc.url"); userName = params.get("flink.jdbc.username"); password = params.get("flink.jdbc.password"); reconnect(); } private void reconnect() throws Exception { if (connection != null && !connection.isClosed()) { try { connection.close(); } catch (Exception ignored) {} } try { connection = DriverManager.getConnection( url, userName, password ); stmt = connection.prepareStatement( "*******" ); }catch (Exception e){ System.out.println("Error connecting to data source, exception -> " + e.getMessage()); } } @Override public void run(SourceContext<String> ctx) throws Exception { while (isRunning) { try { ResultSet rs = stmt.executeQuery(); while (rs.next()) { String record = rs.getLong("***") + "," + rs.getString("***"); ctx.collect(record); } } catch (SQLException e) { reconnect(); } Thread.sleep(120_000); } } @Override public void cancel() { isRunning = false; try { if (stmt != null) stmt.close(); } catch (Exception ignore) {} try { if (connection != null) connection.close(); } catch (Exception ignore) {} } } 

Is it possible that a background thread is interfering with the DB connection here?

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.