0

There is spring-boot application with kafka dependecy, there are two Kafka topics and need to read messages from them

tacocloud.orders.topic tacocloud.tacos.topic 

And already successful sent messages in it

Configured kafka config for listen this topics like this

 package com.example.tacocloud.config; import com.example.tacocloud.model.jpa.Order; import com.example.tacocloud.model.jpa.Taco; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.MessageListener; import java.util.HashMap; import java.util.Map; @Slf4j @Configuration @EnableKafka @EnableConfigurationProperties public class KafkaConfig { // Order @Bean @ConfigurationProperties("spring.kafka.consumer.order") public Map<String, Object> orderConsumerConfig() { return new HashMap<>(); } @Bean public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig) { return new DefaultKafkaConsumerFactory<>(orderConsumerConfig); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Order> order1KafkaMessageListenerContainer( @Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig, @Qualifier("orderConsumerFactory") DefaultKafkaConsumerFactory orderConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(orderConsumerFactory); return factory; } // Taco @Bean @ConfigurationProperties("spring.kafka.consumer.taco") public Map<String, Object> tacoConsumerConfig() { return new HashMap<>(); } @Bean public DefaultKafkaConsumerFactory tacoConsumerFactory( @Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig) { return new DefaultKafkaConsumerFactory<>(tacoConsumerConfig); } @Bean public ConcurrentKafkaListenerContainerFactory tacoConcurrentMessageListenerContainer( @Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig, @Qualifier("tacoConsumerFactory") DefaultKafkaConsumerFactory tacoConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(tacoConsumerFactory); return factory; } } 

So, there are two DefaultKafkaConsumerFactory and two ConcurrentKafkaListenerContainerFactory Aften that, created a service with @KafkaListener log messages:

package com.example.tacocloud.service; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service @EnableKafka public class KafkaService { @KafkaListener(topics = "tacocloud.orders.topic", groupId = "one") public void order() { System.out.println("Order"); } @KafkaListener(topics ="tacocloud.tacos.topic", groupId = "two") public void taco() { System.out.println("Taco"); } } 

application.yml file

spring: kafka: consumer: order: topic: tacocloud.orders.topic "[bootstrap.servers]": localhost:29888 "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer "[value.deserializer]": com.example.tacocloud.model.serialization.OrderDeserializer template: "[default.topic]": tacocloud.orders.topic taco: topic: tacocloud.tacos.topic "[bootstrap.servers]": localhost:29888 "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer "[value.deserializer]": com.example.tacocloud.model.serialization.TacoDeserializer template: "[default.topic]": tacocloud.tacos.topic 

But, when I start my spring-boot application, I see error message(((

2022-04-15 21:38:35.918 ERROR 13888 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.16.jar:5.3.16] at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.4.jar:2.6.4] at com.example.tacocloud.TacoCloudApplication.main(TacoCloudApplication.java:10) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na] at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.6.4.jar:2.6.4] Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:640) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:665) ~[kafka-clients-2.8.0.jar:na] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:416) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:384) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:360) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:327) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:304) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:758) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:344) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:331) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:276) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.16.jar:5.3.16] ... 19 common frames omitted

Process finished with exit code 0

4
  • ` Missing required configuration "key.deserializer" which has no default value` - does it say anything to you? WE just don't know what are your spring.kafka.consumer.order. So, it might not be a surprise that there are no some required props provided. Commented Apr 15, 2022 at 19:00
  • @ArtemBilan added application.yml Commented Apr 15, 2022 at 19:02
  • I'm not so good with YAML to be sure in that "[key.deserializer]" syntax. Also I've never seen that @ConfigurationProperties on a Map bean. Any chances to share with us a simple project to let us reproduce and play with. Commented Apr 15, 2022 at 19:17
  • @ArtemBilan placed it on github github.com/VOsipenkov/Kafka-issue Commented Apr 15, 2022 at 19:39

2 Answers 2

2

Thank you for a sample.

So, I opened it locally and placed a break point into this bean definition:

@Bean public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig) { return new DefaultKafkaConsumerFactory<Object, Order>(orderConsumerConfig); } 

That orderConsumerConfig map looks like this:

orderConsumerConfig = {LinkedHashMap@8587} size = 1 "orderConsumerConfig" -> {HashMap@8600} size = 5 key = "orderConsumerConfig" value = {HashMap@8600} size = 5 "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" "template" -> {LinkedHashMap@8611} size = 1 "topic" -> "tacocloud.orders.topic" "bootstrap.servers" -> "localhost:29888" "value.deserializer" -> "sample.kafka.serializer.OrderDeserializer" 

so, that's indeed not a surprise that your KafkaConsumer is not initialized properly. Your target map config is hidden under that orderConsumerConfig entry of this injected map.

Would you mind to share with me from where did you take an idea of the @ConfigurationProperties on the Map bean? And how that supposed to be used as dependency injection?

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you very much. It was inccorect from my side to use Map in configuration
1

I wanted to do something similar (configure multiple ConsumerFactories) based on properties. I used @ConfigurationProperties to create a Map<String,String> instead of Map<String,Object> and then added the items of that map into a new Map<String,Object>. Not sure why Spring-Boot loaded the Map<String,Object> that way.

@Bean @ConfigurationProperties("taco-cart.kafka") public Map<String, String> tacoCartKafkaProperties() { return new HashMap<>(); } @Bean public ConsumerFactory<String, TacoCart> tacoCartConsumerFactory(@Qualifier("tacoCartKafkaProperties") Map<String, String> tacoCartKafkaProperties) { // Convert map. Map<String, Object> config = new HashMap<>(); config.putAll(tacoCartKafkaProperties); return new DefaultKafkaConsumerFactory<>(config); } 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.