There is spring-boot application with kafka dependecy, there are two Kafka topics and need to read messages from them
tacocloud.orders.topic tacocloud.tacos.topic And already successful sent messages in it
Configured kafka config for listen this topics like this
package com.example.tacocloud.config; import com.example.tacocloud.model.jpa.Order; import com.example.tacocloud.model.jpa.Taco; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.MessageListener; import java.util.HashMap; import java.util.Map; @Slf4j @Configuration @EnableKafka @EnableConfigurationProperties public class KafkaConfig { // Order @Bean @ConfigurationProperties("spring.kafka.consumer.order") public Map<String, Object> orderConsumerConfig() { return new HashMap<>(); } @Bean public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig) { return new DefaultKafkaConsumerFactory<>(orderConsumerConfig); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Order> order1KafkaMessageListenerContainer( @Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig, @Qualifier("orderConsumerFactory") DefaultKafkaConsumerFactory orderConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(orderConsumerFactory); return factory; } // Taco @Bean @ConfigurationProperties("spring.kafka.consumer.taco") public Map<String, Object> tacoConsumerConfig() { return new HashMap<>(); } @Bean public DefaultKafkaConsumerFactory tacoConsumerFactory( @Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig) { return new DefaultKafkaConsumerFactory<>(tacoConsumerConfig); } @Bean public ConcurrentKafkaListenerContainerFactory tacoConcurrentMessageListenerContainer( @Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig, @Qualifier("tacoConsumerFactory") DefaultKafkaConsumerFactory tacoConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(tacoConsumerFactory); return factory; } } So, there are two DefaultKafkaConsumerFactory and two ConcurrentKafkaListenerContainerFactory Aften that, created a service with @KafkaListener log messages:
package com.example.tacocloud.service; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service @EnableKafka public class KafkaService { @KafkaListener(topics = "tacocloud.orders.topic", groupId = "one") public void order() { System.out.println("Order"); } @KafkaListener(topics ="tacocloud.tacos.topic", groupId = "two") public void taco() { System.out.println("Taco"); } } application.yml file
spring: kafka: consumer: order: topic: tacocloud.orders.topic "[bootstrap.servers]": localhost:29888 "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer "[value.deserializer]": com.example.tacocloud.model.serialization.OrderDeserializer template: "[default.topic]": tacocloud.orders.topic taco: topic: tacocloud.tacos.topic "[bootstrap.servers]": localhost:29888 "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer "[value.deserializer]": com.example.tacocloud.model.serialization.TacoDeserializer template: "[default.topic]": tacocloud.tacos.topic But, when I start my spring-boot application, I see error message(((
2022-04-15 21:38:35.918 ERROR 13888 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.16.jar:5.3.16] at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.4.jar:2.6.4] at com.example.tacocloud.TacoCloudApplication.main(TacoCloudApplication.java:10) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na] at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.6.4.jar:2.6.4] Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:640) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:665) ~[kafka-clients-2.8.0.jar:na] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:416) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:384) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:360) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:327) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:304) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:758) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:344) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:331) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:276) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.16.jar:5.3.16] ... 19 common frames omitted
Process finished with exit code 0
spring.kafka.consumer.order. So, it might not be a surprise that there are no some required props provided."[key.deserializer]"syntax. Also I've never seen that@ConfigurationPropertieson aMapbean. Any chances to share with us a simple project to let us reproduce and play with.