I'm using spring boot 2.2.4-RELEASE, spring-kafka 2.4.2.RELEASE

My scenario is the following one: 

In my microservice (let's call it producer microservice) I need to create kakfa topic and then, on some circumstances, I need to send message over a single topic.

This message must be received and handled by another microservice (let's call it consumer microservice). In this consumer microservice I must create kafka-listener every time a new topic is created on the server side.

So I wrote the folloqing code

**producer microservice** 

***spring kafka config***:

 @Configuration
 public class WebmailKafkaConfig {
 @Autowired
 private Environment environment;
 @Bean
 public KafkaAdmin kafkaAdmin(){
 Map<String, Object> configuration = new HashMap<String, Object>();
 configuration.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("webmail.be.messaging.kafka.bootstrap.address"));
 KafkaAdmin result = new KafkaAdmin(configuration);
 return result;
 }
 @Bean
 public ProducerFactory<String, RicezioneMailMessage> producerFactory() {
 Map<String, Object> configProps = new HashMap<>();
 configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("webmail.be.messaging.kafka.bootstrap.address"));
 configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 //configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
 return new DefaultKafkaProducerFactory<>(configProps);
 }
 
 @Bean("ricezioneMailMessageKafkaTemplate")
 public KafkaTemplate<String, RicezioneMailMessage> ricezioneMailMessageKafkaTemplate() {
 return new KafkaTemplate<>(producerFactory());
 }
 }

***spring service kafka manager***

 @Service
 public class WebmailKafkaTopicSvcImpl implements WebmailKafkaTopicSvc {
 private static final Logger logger = LoggerFactory.getLogger(WebmailKafkaTopicSvcImpl.class.getName());
 @Autowired
 private KafkaAdmin kafkaAdmin;
 @Value("${webmail.be.messaging.kafka.topic.numero.partizioni}")
 private int numeroPartizioni;
 @Value("${webmail.be.messaging.kafka.topic.fattore.replica}")
 private short fattoreReplica;
 @Autowired
 @Qualifier("ricezioneMailMessageKafkaTemplate")
 private KafkaTemplate<String, RicezioneMailMessage> ricezioneMailMessageKafkaTemplate;
 @Override
 public void createKafkaTopic(String topicName) throws Exception {
 if(!StringUtils.hasText(topicName)){
 throw new IllegalArgumentException("Passato un topic name non valido ["+topicName+"]");
 }
 AdminClient adminClient = null;
 try{
 adminClient = AdminClient.create(kafkaAdmin.getConfig());
 List<NewTopic> topics = new ArrayList<>(1);
 NewTopic topic = new NewTopic(topicName, numeroPartizioni, fattoreReplica);
 topics.add(topic);
 CreateTopicsResult result = adminClient.createTopics(topics);
 result.all().whenComplete(new KafkaFuture.BiConsumer<Void, Throwable>() {
 @Override
 public void accept(Void aVoid, Throwable throwable) {
 if( throwable != null ){
 logger.error("Errore creazione topic", throwable);
 }
 }
 });
 
 }finally {
 if( adminClient != null ){
 adminClient.close();
 }
 }
 }
 
 @Override
 public void sendMessage(RicezioneMailMessage rmm) throws Exception {
 ListenableFuture<SendResult<String, RicezioneMailMessage>> future = ricezioneMailMessageKafkaTemplate.send(rmm.getPk(), rmm);
 future.addCallback(new ListenableFutureCallback<SendResult<String, RicezioneMailMessage>>() {
 @Override
 public void onFailure(Throwable ex) {
 if( logger.isWarnEnabled() ){
 logger.warn("Impossibile inviare il messaggio=["
 + rmm + "] a causa di : " + ex.getMessage(),ex);
 }
 }
 
 @Override
 public void onSuccess(SendResult<String, RicezioneMailMessage> result) {
 if(logger.isTraceEnabled()){
 logger.trace("Inviato messaggio=[" + rmm +
 "] con offset=[" + result.getRecordMetadata().offset() + "]");
 }
 }
 });
 }
 }

In the producer side all works pretty good. I'm able in creating topics and sending messages.


**consumer microservice**

***dynamic listener class***

 public class DynamicKafkaConsumer {
 private final String brokerAddress;
 private final String topicName;
 private boolean stopTest;
 private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaConsumer.class.getName());
 public DynamicKafkaConsumer(String brokerAddress, String topicName) {
 if( !StringUtils.hasText(brokerAddress)){
 throw new IllegalArgumentException("Passato un broker address non valido");
 }
 if( !StringUtils.hasText(topicName)){
 throw new IllegalArgumentException("Passato un topicName non valido");
 }
 this.brokerAddress = brokerAddress;
 this.topicName = topicName;
 if( logger.isTraceEnabled() ){
 logger.trace("Creato {} con topicName {} e brokerAddress {}", this.getClass().getName(), this.topicName, this.brokerAddress);
 }
 }
 public final void start() {
 MessageListener<String, RicezioneMailMessage> messageListener = (record -> {
 RicezioneMailMessage messaggioRicevuto = record.value();
 
 if( logger.isInfoEnabled() ){
 logger.info("Ricevuto messaggio {} su topic {}", messaggioRicevuto, topicName);
 }
 stopTest = true;
 });
 ConcurrentMessageListenerContainer<String, RicezioneMailMessage> container =
 new ConcurrentMessageListenerContainer<>(
 consumerFactory(brokerAddress),
 containerProperties(topicName, messageListener));
 
 container.start();
 }
 private DefaultKafkaConsumerFactory<String, RicezioneMailMessage> consumerFactory(String brokerAddress) {
 return new DefaultKafkaConsumerFactory<>(
 consumerConfig(brokerAddress),
 new StringDeserializer(),
 new JsonDeserializer<>(RicezioneMailMessage.class));
 }
 
 private ContainerProperties containerProperties(String topic, MessageListener<String, RicezioneMailMessage> messageListener) {
 ContainerProperties containerProperties = new ContainerProperties(topic);
 containerProperties.setMessageListener(messageListener);
 return containerProperties;
 }
 
 private Map<String, Object> consumerConfig(String brokerAddress) {
 return Map.of(
 BOOTSTRAP_SERVERS_CONFIG, brokerAddress,
 GROUP_ID_CONFIG, "groupId",
 AUTO_OFFSET_RESET_CONFIG, "earliest",
 ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"
 );
 }
 
 public boolean isStopTest() {
 return stopTest;
 }
 
 public void setStopTest(boolean stopTest) {
 this.stopTest = stopTest;
 }
 }

***simple unit test*** 


 public class TestRicezioneMessaggiCasellaPostale {
 private static final Logger logger = LoggerFactory.getLogger(TestRicezioneMessaggiCasellaPostale.class.getName());
 
 @Test
 public void testRicezioneMessaggiMail() {
 try {
 
 String brokerAddress = "localhost:9092";
 DynamicKafkaConsumer consumer = new DynamicKafkaConsumer(brokerAddress, "f586caf2-ffdc-4e3a-88b9-a262a502f8ac");
 consumer.start();
 boolean stopTest = consumer.isStopTest();
 while (!stopTest) {
 
 stopTest = consumer.isStopTest();
 }
 } catch (Exception e) {
 logger.error("Errore nella configurazione della casella postale; {}", e.getMessage(), e);
 }
 }
 }

In the consumer side I can't read any message; note that the topic "f586caf2-ffdc-4e3a-88b9-a262a502f8ac" exsists and it's the same topic used on the producer side.

When I send a message on the producer side I can see this log:

> 2020-02-19 22:00:22,320 52822 [kafka-producer-network-thread |
> producer-1] TRACE i.e.t.r.p.w.b.s.i.WebmailKafkaTopicSvcImpl - Inviato
> messaggio=[RicezioneMailMessage{pk='c5c8f8a4-8ddd-407a-9e51-f6b14d84f304',
> tipoMessaggio='mail'}] con offset=[0]



On the consumer side I don't see any message. I just see the following prints:

> 2020-02-19 22:00:03,194 1442 [main] INFO 
> o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: 
> 	allow.auto.create.topics = false 	auto.commit.interval.ms = 5000
> 	auto.offset.reset = earliest 	bootstrap.servers = [localhost:9092]
> 	check.crcs = true 	client.dns.lookup = default 	client.id = 
> 	client.rack = 	connections.max.idle.ms = 540000
> 	default.api.timeout.ms = 60000 	enable.auto.commit = false
> 	exclude.internal.topics = true 	fetch.max.bytes = 52428800
> 	fetch.max.wait.ms = 500 	fetch.min.bytes = 1 	group.id = groupId
> 	group.instance.id = null 	heartbeat.interval.ms = 3000
> 	interceptor.classes = [] 	internal.leave.group.on.close = true
> 	isolation.level = read_uncommitted 	key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> 	max.partition.fetch.bytes = 1048576 	max.poll.interval.ms = 300000
> 	max.poll.records = 500 	metadata.max.age.ms = 300000
> 	metric.reporters = [] 	metrics.num.samples = 2
> 	metrics.recording.level = INFO 	metrics.sample.window.ms = 30000
> 	partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor] 	receive.buffer.bytes
> = 65536 	reconnect.backoff.max.ms = 1000 	reconnect.backoff.ms = 50 	request.timeout.ms = 30000 	retry.backoff.ms = 100
> 	sasl.client.callback.handler.class = null 	sasl.jaas.config = null
> 	sasl.kerberos.kinit.cmd = /usr/bin/kinit
> 	sasl.kerberos.min.time.before.relogin = 60000
> 	sasl.kerberos.service.name = null 	sasl.kerberos.ticket.renew.jitter
> = 0.05 	sasl.kerberos.ticket.renew.window.factor = 0.8 	sasl.login.callback.handler.class = null 	sasl.login.class = null
> 	sasl.login.refresh.buffer.seconds = 300
> 	sasl.login.refresh.min.period.seconds = 60
> 	sasl.login.refresh.window.factor = 0.8
> 	sasl.login.refresh.window.jitter = 0.05 	sasl.mechanism = GSSAPI
> 	security.protocol = PLAINTEXT 	security.providers = null
> 	send.buffer.bytes = 131072 	session.timeout.ms = 10000
> 	ssl.cipher.suites = null 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1,
> TLSv1] 	ssl.endpoint.identification.algorithm = https
> 	ssl.key.password = null 	ssl.keymanager.algorithm = SunX509
> 	ssl.keystore.location = null 	ssl.keystore.password = null
> 	ssl.keystore.type = JKS 	ssl.protocol = TLS 	ssl.provider = null
> 	ssl.secure.random.implementation = null 	ssl.trustmanager.algorithm =
> PKIX 	ssl.truststore.location = null 	ssl.truststore.password = null
> 	ssl.truststore.type = JKS 	value.deserializer = class
> org.springframework.kafka.support.serializer.JsonDeserializer 
> 2020-02-19 22:00:03,630 1878 [main] INFO 
> o.a.k.common.utils.AppInfoParser - Kafka version: 2.4.0 2020-02-19
> 22:00:03,630 1878 [main] INFO o.a.k.common.utils.AppInfoParser -
> Kafka commitId: 77a89fcf8d7fa018 2020-02-19 22:00:03,630 1878 [main]
> INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs:
> 1582146003626 2020-02-19 22:00:03,636 1884 [main] INFO 
> o.a.k.c.consumer.KafkaConsumer - [Consumer
> clientId=consumer-groupId-1, groupId=groupId] Subscribed to topic(s):
> f586caf2-ffdc-4e3a-88b9-a262a502f8ac 2020-02-19 22:00:03,645 1893
> [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing
> ExecutorService 2020-02-19 22:00:03,667 1915 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list:
> {} 2020-02-19 22:00:04,123 2371 [consumer-0-C-1] INFO 
> org.apache.kafka.clients.Metadata - [Consumer
> clientId=consumer-groupId-1, groupId=groupId] Cluster ID:
> hOOJH-WNTNiXD4il0Y7_0Q 2020-02-19 22:00:05,052 3300 [consumer-0-C-1]
> INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer
> clientId=consumer-groupId-1, groupId=groupId] Discovered group
> coordinator localhost:9092 (id: 2147483647 rack: null) 2020-02-19
> 22:00:05,059 3307 [consumer-0-C-1] INFO 
> o.a.k.c.c.i.AbstractCoordinator - [Consumer
> clientId=consumer-groupId-1, groupId=groupId] (Re-)joining group 
> 2020-02-19 22:00:05,116 3364 [consumer-0-C-1] INFO 
> o.a.k.c.c.i.AbstractCoordinator - [Consumer
> clientId=consumer-groupId-1, groupId=groupId] (Re-)joining group 
> 2020-02-19 22:00:05,154 3402 [consumer-0-C-1] INFO 
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer-groupId-1, groupId=groupId] Finished assignment for
> group at generation 1:
> {consumer-groupId-1-41df9153-7c33-46b1-8274-2d7ee2bfb35c=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@a95df1b}
> 2020-02-19 22:00:05,327 3575 [consumer-0-C-1] INFO 
> o.a.k.c.c.i.AbstractCoordinator - [Consumer
> clientId=consumer-groupId-1, groupId=groupId] Successfully joined
> group with generation 1 2020-02-19 22:00:05,335 3583 [consumer-0-C-1]
> INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer-groupId-1, groupId=groupId] Adding newly assigned
> partitions: f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0 2020-02-19
> 22:00:05,363 3611 [consumer-0-C-1] INFO 
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer-groupId-1, groupId=groupId] Found no committed
> offset for partition f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0 
> 2020-02-19 22:00:05,401 3649 [consumer-0-C-1] INFO 
> o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-groupId-1,
> groupId=groupId] Resetting offset for partition
> f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0 to offset 0. 2020-02-19
> 22:00:05,404 3652 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing on
> assignment:
> {f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0=OffsetAndMetadata{offset=0,
> leaderEpoch=null, metadata=''}} 2020-02-19 22:00:05,432 3680
> [consumer-0-C-1] INFO o.s.k.l.ConcurrentMessageListenerContainer -
> groupId: partitions assigned: [f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0]
> 2020-02-19 22:00:08,669 6917 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0
> records 2020-02-19 22:00:08,670 6918 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list:
> {} 2020-02-19 22:00:13,671 11919 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0
> records 2020-02-19 22:00:13,671 11919 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list:
> {} 2020-02-19 22:00:18,673 16921 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0
> records 2020-02-19 22:00:18,673 16921 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list:
> {} 2020-02-19 22:00:23,674 21922 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0
> records 2020-02-19 22:00:23,674 21922 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list:
> {} 2020-02-19 22:00:28,676 26924 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0
> records 2020-02-19 22:00:28,676 26924 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list:
> {} 2020-02-19 22:00:33,677 31925 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0
> records 2020-02-19 22:00:33,677 31925 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list:
> {} 2020-02-19 22:00:38,678 36926 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0
> records 2020-02-19 22:00:38,678 36926 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list:
> {} 2020-02-19 22:00:43,678 41926 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0
> records 2020-02-19 22:00:43,679 41927 [consumer-0-C-1] DEBUG
> o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list:
> {}


Can anybody tell me where I'm wrong?

Thank you

Angelo