1

I am testing the sample code of Spring Kafka. It works fine with PLAINTEXT connection, but doesn't work with SSL connection.

I have verified that key and certificate are valid for kafka broker by successfully running a console consumer:

bin/kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:50070 --topic myTopic --consumer.config client-ssl.properties --from-beginning 

But I cannot send message using Spring Boot(2.0.1.RELEASE) with Spring Kafka, using the same key and certificate.

2018-04-25 15:20:39.126 INFO 9672 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 2018-04-25 15:20:39.126 INFO 9672 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e 2018-04-25 15:20:39.131 INFO 9672 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 2018-04-25 15:20:39.231 INFO 9672 --- [ main] c.c.m.k.e.producer.ProducerApplication : Started ProducerApplication in 1.553 seconds (JVM running for 5.029) 2018-04-25 15:20:39.238 INFO 9672 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [xx.xx.xx.xx:50070] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2018-04-25 15:20:39.257 INFO 9672 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 2018-04-25 15:20:39.257 INFO 9672 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e 2018-04-25 15:21:39.265 ERROR 9672 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='foo1' to topic myTopic: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 2018-04-25 15:22:39.268 ERROR 9672 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='foo2' to topic myTopic: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 2018-04-25 15:23:39.268 ERROR 9672 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='foo3' to topic myTopic: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 2018-04-25 15:24:39.273 INFO 9672 --- [ main] c.c.m.k.e.producer.ProducerApplication : All received 2018-04-25 15:29:39.482 ERROR 9672 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1362) ~[na:1.8.0_20] at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:529) ~[na:1.8.0_20] at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1194) ~[na:1.8.0_20] at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1166) ~[na:1.8.0_20] at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469) ~[na:1.8.0_20] at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:168) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.doClose(Selector.java:739) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.close(Selector.java:727) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.maybeCloseOldestConnection(Selector.java:630) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.poll(Selector.java:427) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:205) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:137) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) ~[kafka-clients-1.0.1.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20] Caused by: java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty at sun.security.validator.PKIXValidator.<init>(PKIXValidator.java:90) ~[na:1.8.0_20] at sun.security.validator.Validator.getInstance(Validator.java:179) ~[na:1.8.0_20] at sun.security.ssl.X509TrustManagerImpl.getValidator(X509TrustManagerImpl.java:312) ~[na:1.8.0_20] at sun.security.ssl.X509TrustManagerImpl.checkTrustedInit(X509TrustManagerImpl.java:171) ~[na:1.8.0_20] at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:239) ~[na:1.8.0_20] at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) ~[na:1.8.0_20] at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1356) ~[na:1.8.0_20] at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:156) ~[na:1.8.0_20] at sun.security.ssl.Handshaker.processLoop(Handshaker.java:925) ~[na:1.8.0_20] at sun.security.ssl.Handshaker$1.run(Handshaker.java:865) ~[na:1.8.0_20] at sun.security.ssl.Handshaker$1.run(Handshaker.java:862) ~[na:1.8.0_20] at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_20] at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1302) ~[na:1.8.0_20] at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:389) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:469) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:328) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:255) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:79) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.poll(Selector.java:412) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:219) ~[kafka-clients-1.0.1.jar:na] ... 8 common frames omitted Caused by: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty at java.security.cert.PKIXParameters.setTrustAnchors(PKIXParameters.java:200) ~[na:1.8.0_20] at java.security.cert.PKIXParameters.<init>(PKIXParameters.java:120) ~[na:1.8.0_20] at java.security.cert.PKIXBuilderParameters.<init>(PKIXBuilderParameters.java:104) ~[na:1.8.0_20] at sun.security.validator.PKIXValidator.<init>(PKIXValidator.java:88) ~[na:1.8.0_20] ... 32 common frames omitted 2018-04-25 15:30:09.284 ERROR 9672 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread: java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1362) ~[na:1.8.0_20] at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:529) ~[na:1.8.0_20] at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1194) ~[na:1.8.0_20] at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1166) ~[na:1.8.0_20] at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469) ~[na:1.8.0_20] at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:168) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.doClose(Selector.java:739) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.close(Selector.java:727) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.maybeCloseOldestConnection(Selector.java:630) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.poll(Selector.java:427) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-1.0.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20] Caused by: java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty at sun.security.validator.PKIXValidator.<init>(PKIXValidator.java:90) ~[na:1.8.0_20] at sun.security.validator.Validator.getInstance(Validator.java:179) ~[na:1.8.0_20] at sun.security.ssl.X509TrustManagerImpl.getValidator(X509TrustManagerImpl.java:312) ~[na:1.8.0_20] at sun.security.ssl.X509TrustManagerImpl.checkTrustedInit(X509TrustManagerImpl.java:171) ~[na:1.8.0_20] at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:239) ~[na:1.8.0_20] at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) ~[na:1.8.0_20] at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1356) ~[na:1.8.0_20] at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:156) ~[na:1.8.0_20] at sun.security.ssl.Handshaker.processLoop(Handshaker.java:925) ~[na:1.8.0_20] at sun.security.ssl.Handshaker$1.run(Handshaker.java:865) ~[na:1.8.0_20] at sun.security.ssl.Handshaker$1.run(Handshaker.java:862) ~[na:1.8.0_20] at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_20] at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1302) ~[na:1.8.0_20] at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:389) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:469) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:328) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:255) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:79) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474) ~[kafka-clients-1.0.1.jar:na] at org.apache.kafka.common.network.Selector.poll(Selector.java:412) ~[kafka-clients-1.0.1.jar:na] ... 4 common frames omitted Caused by: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty at java.security.cert.PKIXParameters.setTrustAnchors(PKIXParameters.java:200) ~[na:1.8.0_20] at java.security.cert.PKIXParameters.<init>(PKIXParameters.java:120) ~[na:1.8.0_20] at java.security.cert.PKIXBuilderParameters.<init>(PKIXBuilderParameters.java:104) ~[na:1.8.0_20] at sun.security.validator.PKIXValidator.<init>(PKIXValidator.java:88) ~[na:1.8.0_20] ... 23 common frames omitted 

application.properties

spring.kafka.bootstrap-servers=xx.xx.xx.xx:50070 spring.kafka.consumer.group-id=aaa spring.kafka.properties.security.protocol=SSL spring.kafka.consumer.ssl.keystore-location=classpath:client.keystore.jks spring.kafka.consumer.ssl.keystore-password=1234 spring.kafka.consumer.ssl.key-password=1234 

Does anyone successfully configure Spring Boot 2.0 + Spring Kafka with SSL?

1 Answer 1

2
ssl.keystore.location = null ssl.keystore.password = null 

You are only configuring SSL for the consumer...

spring.kafka.consumer.ssl.keystore-location=classpath:client.keystore.jks spring.kafka.consumer.ssl.keystore-password=1234 spring.kafka.consumer.ssl.key-password=1234 

Use spring.kafka.ssl. ... for properties common to both producers and consumers. If you don't have a consumer, use spring.kafka.producer.ssl. ....

Sign up to request clarification or add additional context in comments.

2 Comments

Could you help find documents for configuration? It seems a little different from kafka configuration (eg., keystore.location vs. keystore-location). Thanks.
See the boot documentation. Boot supports a subset of the kafka properties in a first-class manner - this allows some extra functionality, such as converting spring Resources (e.g. classpath:... for the keystore) to a file system absolute path - Kafka has no way to find the file on the classpath.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.