I wrote kafka producer / consumer for my app:
Consumer config:
@EnableKafka @Configuration class KafkaConsumerConfig { @Bean fun consumerFactory(): ConsumerFactory<String, String> { val props: MutableMap<String, Any> = HashMap() props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092" props[ConsumerConfig.GROUP_ID_CONFIG] = "group12345" props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java return DefaultKafkaConsumerFactory(props) } @Bean fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> { val factory = ConcurrentKafkaListenerContainerFactory<String, String>() factory.consumerFactory = consumerFactory() return factory } } Producer config:
@Configuration class KafkaProducerConfig { @Bean fun producerFactory(): ProducerFactory<String, String> { val configProps: MutableMap<String, Any> = HashMap() configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092" configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java return DefaultKafkaProducerFactory(configProps) } @Bean fun kafkaTemplate(): KafkaTemplate<String, String> { return KafkaTemplate(producerFactory()) } } Topic config:
@Configuration class KafkaTopicConfig { @Bean fun kafkaAdmin(): KafkaAdmin { val configs: MutableMap<String, Any?> = HashMap() configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092" return KafkaAdmin(configs) } @Bean fun topic1(): NewTopic { return NewTopic("kafkaTest", 1, 1.toShort()) } } Kafka service:
@Service class KafkaService( private val kafkaTemplate: KafkaTemplate<String, String> ) { fun send() { kafkaTemplate.send("kafkaTest", "test message ${System.currentTimeMillis()}") } @KafkaListener(topics = ["kafkaTest"], groupId = "group12345") fun listenGroupFoo(message: String) { println("--> $message") } } That's ALL classes in my app. When I trying to run app, I get this exception:
2021-10-11 17:20:13.319 WARN 8544 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Error connecting to node 34bcfcc207e0:9092 (id: 1001 rack: null)
java.net.UnknownHostException: 34bcfcc207e0
I have no idea, what is host 34bcfcc207e0. It appears at start or thread.
What's wrong?
