0

I'm running spring boot with KafkaListener as my client. The question is how can we recover from a failed kafka configuration and avoid that the application stops with Process finished with exit code 0. An example of an incorrect config would e.g. an incorrect endpoint url. Same scenario would apply if the Kafka server would not be reachable. So in any case the KafkaListner process should never kill the server.

 @Bean open fun consumerFactory(): ConsumerFactory<String, String> { val deserializer = JsonDeserializer<Thing>() deserializer.addTrustedPackages("de.data.Thing") val props: MutableMap<String, Any> = HashMap() val serverUrl = server.substringBefore(":") props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server props[ConsumerConfig.GROUP_ID_CONFIG] = "group" props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java props[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL" props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false props[SaslConfigs.SASL_MECHANISM] = "PLAIN" props[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.plain.PlainLoginModule required " + "username=\"\$ConnectionString\" " + "password=\"Endpoint=sb://$serverUrl/;" + "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=$sharedSecret\";" return DefaultKafkaConsumerFactory(props, StringDeserializer(), StringDeserializer()) } @Bean open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? { val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory<String, String>() factory.consumerFactory = consumerFactory() factory.setMessageConverter(BytesJsonMessageConverter()) return factory } @KafkaListener(topics = ["topic"], groupId = "group", containerFactory = "kafkaListenerContainerFactory", ) fun listenThingsChanged(@Payload thing: Thing, record: ConsumerRecord<String, String>, @Headers headers: MessageHeaders) { .... } 

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) at de.x.ServerAppKt.main(ServerApp.kt:11) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:606) at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ... 19 common frames omitted Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:737) ... 33 common frames omitted

2
  • 1
    What do you expect it to do? If the configuration is wrong, it is wrong - how can it possibly recover from bad configuration? Commented Jul 14, 2021 at 13:38
  • The configuration is syntactically correct. But it can not start the KafkaListener process because the url of the server is wrong or not reachable. In that case I would like that the KafkaListener process does not crash the whole server. Because my server does fulfil other tasks as well. Commented Jul 14, 2021 at 15:23

1 Answer 1

2

If the broker is just down, the application will start fine (with versions earlier than 2.3.4 you had to set missingTopicsFatal to false on the container properties (it has been false by default since then).

No resolvable bootstrap urls given in...

This is fatal - it is irrecoverable.

However, you can set autoStartup=false - either on the @KafkaListener or on the container factory.

This will prevent Spring from trying to start the containers during application initialization.

You can then start the containers yourself in a try/catch block.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.