0

I have a Spring Boot app that has a Kafka consumer and producer in it. There's also a bean to create a topic.

e.g.

@KafkaListener(topics = "myTopic") public void doSomething() { // do something on receipt of the message } @Bean public NewTopic topic(){ return TopicBuilder.name("myTopic") .partitions(2) . 

Both my Spring Boot app and Kafka start up in Docker in Kubernetes. Sometimes the Spring Boot app starts up before the Kafka pod is up and therefore fails to start as the consumer cannot connect (see stacktrace).

Is there a way of my application starting up in a resilient manner ? For example the consumer should cope with Kafka not being there at startup or when the app is running ?

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629) at org.springframework.kafka.core.Def Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:207) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:193) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:167) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:141) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:607) at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:329) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340) at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:176) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ... 59 common frames omitted Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)aultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:207) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:193) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:167) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:141) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:607) at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:329) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340) at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:176) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ... 59 common frames omitted Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735) 
2
  • 1
    It is a bad practise to start a service, even when the dependencies aren't started yet. Could lead to failure in calls. Commented Sep 24, 2020 at 8:23
  • On the contrary, it is a good practice to start a service even if dependencies are unavailable. An error response signaling what's wrong with the service is better than no response at all caused by a service that isn't started. Plus, refusing to start but continuing to run when a backing service fails after your service has already started introduces an inconsistency - you get different behavior for the same situation. Commented Feb 21, 2022 at 14:42

1 Answer 1

4

You can set autostartup = "false" on the listener and start it yourself (using the KafkaListenerEndpointRegistry - give the listener an id so you can get a reference to its container from the registry).

If the broker is not available, the KafkaAdmin won't create the topic; you will also need to call KafkaAdmin.initialize():

/** * Call this method to check/add topics; this might be needed if the broker was not * available when the application context was initialized, and * {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false, * or {@link #setAutoCreate(boolean) autoCreate} was set to false. * @return true if successful. * @see #setFatalIfBrokerNotAvailable(boolean) * @see #setAutoCreate(boolean) */ public final boolean initialize() { 
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks - so I'd have to do some sort of polling in an async task waiting for Kafka to come up and then start up the listener and initialize once it's up ?
Right; you can call kafkaAdmin.initialize() in a loop until it succeeds and then start the container.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.