1

The camel Kafka component is unable to read the messages with ssl enable, and it's not giving any errors also, below one is my router

Anyone, please help me how to resolve this type of issue, it's not showing error/failure logs also.

from("kafka:testtopic9?brokers=<domain-name>:9092" + "&groupId=test" + "&sslKeyPassword=12345" + "&sslKeystorePassword=12345" + "&securityProtocol=SASL_SSL" + "&sslTruststoreLocation=kafka.client.truststore.jks" + "&saslMechanism=PLAIN" + "&keyDeserializer=org.apache.kafka.common.serialization.IntegerDeserializer" + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1") .log("fetching data from broker :: ${body}") 

In the logs it's showing only config properties values and it's not showing further logs like subscribing to the topic etc... :

<pre> 03-12-2020 12:56:06.871 [main] INFO o.s.s.c.ThreadPoolTaskExecutor.initialize - Initializing ExecutorService 'applicationTaskExecutor' 03-12-2020 12:56:07.538 [main] INFO o.a.c.i.c.DefaultTypeConverter.doStart - Type converters loaded (core: 195, classpath: 14) 03-12-2020 12:56:07.899 [main] INFO o.a.coyote.http11.Http11NioProtocol.log - Starting ProtocolHandler ["http-nio-8080"] 03-12-2020 12:56:07.923 [main] INFO o.s.b.w.e.tomcat.TomcatWebServer.start - Tomcat started on port(s): 8080 (http) with context path '' 03-12-2020 12:56:07.942 [main] INFO o.a.c.spring.boot.RoutesCollector.loadXmlRoutes - Loading additional Camel XML routes from: classpath:camel/*.xml 03-12-2020 12:56:07.942 [main] INFO o.a.c.spring.boot.RoutesCollector.loadXmlRests - Loading additional Camel XML rests from: classpath:camel-rest/*.xml 03-12-2020 12:56:07.951 [main] INFO o.a.camel.spring.SpringCamelContext.start - Apache Camel 2.25.2 (CamelContext: camel-1) is starting 03-12-2020 12:56:07.952 [main] INFO o.a.c.m.ManagedManagementStrategy.doStart - JMX is enabled 03-12-2020 12:56:08.104 [main] INFO o.a.camel.spring.SpringCamelContext.doStartCamel - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html 03-12-2020 12:56:08.135 [main] INFO o.a.c.component.kafka.KafkaConsumer.doStart - Starting Kafka consumer on topic: testtopic9 with breakOnFirstError: false 03-12-2020 12:56:08.145 [main] INFO o.a.camel.spring.SpringCamelContext.doStartOrResumeRouteConsumers - Route: route1 started and consuming from: kafka://testtopic9?brokers=<domain-name>%3A9092&groupId=test&keyDeserializer=org.apache.kafka.common.serialization.IntegerDeserializer&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&sslKeyPassword=xxxxxx&sslKeystorePassword=xxxxxx&sslTruststoreLocation=C%3A%5CUsers%5CSRJANA%5CDesktop%5CKafka%5Ckafka.client.truststore.jks&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer 03-12-2020 12:56:08.148 [main] INFO o.a.camel.spring.SpringCamelContext.start - Total 1 routes, of which 1 are started 03-12-2020 12:56:08.149 [main] INFO o.a.camel.spring.SpringCamelContext.start - Apache Camel 2.25.2 (CamelContext: camel-1) started in 0.198 seconds 03-12-2020 12:56:08.151 [main] INFO c.c.cdc.CDCPostProcessorApplication.logStarted - Started CDCPostProcessorApplication in 3.409 seconds (JVM running for 4.561) 03-12-2020 12:56:08.160 [Camel (camel-1) thread #1 - KafkaConsumer[testtopic9]] INFO o.a.k.c.consumer.ConsumerConfig.logAll - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [ <domain-name> ] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = e0a4fadb-5e12-49ab-87d5-3b124d3e1c76 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 50 reconnect.backoff.ms = 50 request.timeout.ms = 40000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer </pre> 
2
  • maybe increase the log verbosity and see if it add some details about what's gogin on Commented Dec 3, 2020 at 8:50
  • @LucaBurgazzoli , could you please suggest how to do it? Commented Dec 3, 2020 at 8:55

1 Answer 1

2

Are you sure you've got a problem here? If the SSL connection to the broker would not work, I would expect the route to fail. But the route is started.

You got auto.offset.reset = latest in your consumer settings. That means that the consumer ignores all existing messages when it connects for the first time.

Have you sent a message to the topic after the consumer was started and connected?

Sign up to request clarification or add additional context in comments.

4 Comments

thanks for the reply, yes after I connected to the topic and pushed some records to it
Sorry, I don't fully understand your comment. was my answer correct or is the problem something else?
i'm pushing the records but even though it's not working
I got the solution with camel community, I used different versions for components instead of the same version

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.