1

I wrote kafka producer / consumer for my app:

Consumer config:

@EnableKafka @Configuration class KafkaConsumerConfig { @Bean fun consumerFactory(): ConsumerFactory<String, String> { val props: MutableMap<String, Any> = HashMap() props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092" props[ConsumerConfig.GROUP_ID_CONFIG] = "group12345" props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java return DefaultKafkaConsumerFactory(props) } @Bean fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> { val factory = ConcurrentKafkaListenerContainerFactory<String, String>() factory.consumerFactory = consumerFactory() return factory } } 

Producer config:

@Configuration class KafkaProducerConfig { @Bean fun producerFactory(): ProducerFactory<String, String> { val configProps: MutableMap<String, Any> = HashMap() configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092" configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java return DefaultKafkaProducerFactory(configProps) } @Bean fun kafkaTemplate(): KafkaTemplate<String, String> { return KafkaTemplate(producerFactory()) } } 

Topic config:

@Configuration class KafkaTopicConfig { @Bean fun kafkaAdmin(): KafkaAdmin { val configs: MutableMap<String, Any?> = HashMap() configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092" return KafkaAdmin(configs) } @Bean fun topic1(): NewTopic { return NewTopic("kafkaTest", 1, 1.toShort()) } } 

Kafka service:

@Service class KafkaService( private val kafkaTemplate: KafkaTemplate<String, String> ) { fun send() { kafkaTemplate.send("kafkaTest", "test message ${System.currentTimeMillis()}") } @KafkaListener(topics = ["kafkaTest"], groupId = "group12345") fun listenGroupFoo(message: String) { println("--> $message") } } 

That's ALL classes in my app. When I trying to run app, I get this exception:

2021-10-11 17:20:13.319 WARN 8544 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Error connecting to node 34bcfcc207e0:9092 (id: 1001 rack: null)

java.net.UnknownHostException: 34bcfcc207e0

I have no idea, what is host 34bcfcc207e0. It appears at start or thread.

enter image description here

What's wrong?

5
  • Try adding "localhost:2181" to your "bootstrapservers" in consumer config . This is to add consumers to zookeeper for consumer re-balancing and registering with zookeeper Commented Oct 11, 2021 at 14:44
  • 1
    Is the Kafka broker running inside Docker? Is the Kafka broker running inside a Docker container with container ID 34bcfcc207e0? Commented Oct 11, 2021 at 14:45
  • 1
    It's called a bootstrap server for just that reason; the client initially connects to that address, and the server returns the actual host name(s) to connect to; when using docker, you need to configure Kafka's advertised hosts within the container. Commented Oct 11, 2021 at 14:47
  • yes, it is container name... Commented Oct 11, 2021 at 14:48
  • @Umeshwaran Kafka clients do not use Zookeeper as bootstrap servers Commented Oct 11, 2021 at 14:53

2 Answers 2

2
  1. Kafka is not an HTTP service. Remove http:// from all your strings

  2. If you're running Kafka in a Container, the default advertised listener is using its hostname (the container ID), and you need to change this to use an address you expect Connect to Kafka running in Docker

Sign up to request clarification or add additional context in comments.

Comments

0

I had similar issue using docker-compose, at the beginning I didn't know where it came from that hash id either, then I realized it corresponded to the container id, and by default the hostname of my broker service.

docker ps

CONTAINER ID PORTS NAMES b6d781c2089b 2181/tcp, 0.0.0.0:9092->9092/tcp broker-1 * fac6ef279a3e 2181/tcp, 0.0.0.0:9093->9092/tcp broker-2 cbbc73bfe92e 0.0.0.0:2181->2181/tcp zookeeper 

log

broker-2 | Recorded new controller, from now on will use broker fac6ef279a3e:9092 

When I tried to connect with my Java consumer using:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093"); 

gets the following error:

java.net.UnknownHostException: fac6ef279a3e 

I solved it as follows

adding a hostname to each broker

services: ... broker-1: hostname: broker-1 ports: - "9092:9092" ... broker-2: hostname: broker-2 ports: - "9093:9092" 

setting my /etc/hosts like:

#### kafka 127.0.0.1 broker-1 127.0.0.1 broker-2 

and my props connection

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"broker-1:9092,broker-2:9093"); 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.