25

I am trying to setup a Kafka cluster (the first node in the cluster actually).

I have a single node zookeeper cluster setup. I am setting up kafka on a separate node.

Both running CentOS 6.4, running IPV6 which is a bit of a PITA. I verified that the machines can talk to each other using netcat.

When I startup kafka, I am getting the following exception (which causes kafka to shut down). EDIT: I got kafka starting, I had to set the host.name property in the server.config file.

I was able to create a test topic and send messages just fine from the kafka server.

However, I get the same error when trying to consume the messages.

Any help, suggestions?

bin/kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning Exception in thread "main" java.net.UnknownHostException: kafka: kafka: Name or service not known at java.net.InetAddress.getLocalHost(InetAddress.java:1473) at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107) at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:178) at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala) Caused by: java.net.UnknownHostException: kafka: Name or service not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293) at java.net.InetAddress.getLocalHost(InetAddress.java:1469) ... 5 more 
8
  • Internet connection, URL are correct? Commented Aug 26, 2014 at 3:32
  • Well, I know that I can communicate from the KAFKA server to ZOOKEEPER because a leader was elected, I created a topic, and posted a sample message. It is consuming messages that isn't working. There is some configuration someplace that isn't right here, but I am not clear on what that is. Commented Aug 26, 2014 at 11:49
  • so I am assuming that config/server.properties does have the host.name property set properly .. what is your /etc/hosts file shows? Commented Aug 26, 2014 at 12:09
  • host.name=kafka1; zookeeper.connect=zk1:2181. I have entries in the /etc/hosts file for both "kafka1" and "zk1" with the correct IPV6 addresses. Can confirm that both servers can talk to each other by using PING and "nc -z -w 1 kafka1 9092" and "nc -z -w 1 zk1 2181" Commented Aug 26, 2014 at 12:53
  • I am thinking there is something on the zookeeper side that I didn't setup correctly? When I see "UnknownHostException: kafka", it looks like "kafka" is a default setup someplace, and of course that is not right. Commented Aug 26, 2014 at 12:55

5 Answers 5

32

When you run > bin/kafka-console-consumer.sh command kafka loads a ConsoleConsumer, which will attempt to create a consumer with an auto generated consumer id. The way Kafka generates the consumer id is to concatenate the name of the local host to it. So, in the problem was the fact that java could not resolve the ip address for local host on the Open Stack VM I am working with.

So the answer was that the Open Stack VM was resolving the local host name to kafka, which is the name of the VM. I had everything setup in the Kafka and Zookeeper instances as kafka1.

So, when java was calling getLocalHost, it was trying to find the IP Address for kafka, which I did not have in my /etc/hosts file.

I simply added an entry for kafka in my /etc/hosts file and everything started working wonderfully!!!

I would have thought it would resolve to localhost, but it did not, it resolved to the name of the vm, kafka.

Sign up to request clarification or add additional context in comments.

4 Comments

So how you kafka entry look like now?
how does your entry look like ?
127.0.0.1 localhost 127.0.0.1 ip-172-31-0-252
Editing the hosts file is only a workaround - the actual fix is to set advertised.listener correctly - see rmoff.net/2018/08/02/kafka-listeners-explained
8

As noplay pointed out the issue was that Kafka wasn't able to resolve the correct IP, this may happen for example on you EC2 instances running in private subnets without assignment of public IP. The solution summarized:

hostname 

Which will show you the host name, something like ip-10-180-128-217. Then just update your /etc/hosts

sudo nano /etc/hosts 

edit, e.g.

127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ip-10-180-128-217 

Comments

3

Under your kafka_folder/config (sudo vim /etc/hosts), your file should look like that:

127.0.0.1 kafka localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain <localhost_OR_DNS_OR_ip_address> <localhost_OR_DNS_OR_ip_address> kafka 

note that kafka is my desired hostname name.

Then, under

kafka_folder/config/server.properties

there's a field "listeners=".

It looks like that

# The address the socket server listens on. If not configured, the host name will be equal to the value of # java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 

Simply uncomment that field by removing the "#" before it. Should look like that:

listeners=PLAINTEXT://<localhost_OR_DNS_OR_ip_address>:9092

Comments

0
  • if you get dns server configured kafka will consider your dns domain name so tape this command on your server

    hostname the result will be your localdomain, now copie your localdomain then open hosts file

    nano /etc/hosts add your localdomain.

    127.0.0.1 localhost localhost.localdomain "your localdomain"

    ::1 localhost localhost.localdomain "your localdomain"

Comments

0

When working with Kafka, especially in environments where the default host resolution setup might not work (e.g., when the /etc/hosts file cannot be modified), configuring DNS for brokers can be challenging. To address this, I implemented a custom DNS resolver that integrates directly into the Kafka producer. While it might not be the most elegant solution, it works effectively for both producers and consumers.

Implementation

Custom Producer with DNS Resolver

The producer uses a utility method to inject the custom DNS resolver. If enabled, it ensures the broker hostname resolves to the configured IP:

public abstract class CustomAvroProducer<T> extends AbstractKafkaClient implements CustomProducer<T> { private static final Logger LOG = LogManager.getLogger(CustomAvroProducer.class); private final List<T> records = Collections.synchronizedList(new ArrayList<>()); private final KafkaProducer<String, T> producer = new KafkaProducer<>(getProperties()); public CustomAvroProducer(KafkaTopic topic) { super(topic); configureCustomDnsResolver(); } private void configureCustomDnsResolver() { if (BROKER_DNS_ENABLE) { // Remove if need try { // DEFAULT_BROKER_HOST = kafka // BROKER_IP = 10.1.1.9 KafkaUtil.injectDnsResolverForBroker(producer, DEFAULT_BROKER_HOST, BROKER_IP); LOG.info("Custom DNS resolver activated for broker: {}", BROKER_IP); } catch (Exception e) { LOG.error("Failed to add DNS for hostname '{}', broker IP '{}'", DEFAULT_BROKER_HOST, BROKER_IP, e); } } } ... } /** * Utility class for Kafka-related operations. * This class provides methods to inject a custom DNS resolver * into the Kafka producer, ensuring the broker name is recognized * and messages are sent correctly. */ public class KafkaUtil { /** * Injects a custom DNS resolver into the Kafka producer, * configuring the broker's IP address. * * @param kafka The Kafka producer where the DNS resolver will be injected. * @param brokerDefault The default broker name to be used by the DNS resolver. * @param brokerIp The IP address of the Kafka broker. * @throws NoSuchFieldException If one of the expected fields is not found. * @throws IllegalAccessException If access to the field is not allowed. */ public static <T> void injectDnsResolverForBroker(KafkaProducer<String, T> kafka, String brokerDefault, String brokerIp) throws NoSuchFieldException, IllegalAccessException { Object connectionStates = getProducerConnectionStates(kafka); Field hostResolver = connectionStates.getClass().getDeclaredField("hostResolver"); hostResolver.setAccessible(true); hostResolver.set(connectionStates, new CustomDnsResolver(brokerDefault, brokerIp)); } private static <T> Object getProducerConnectionStates(KafkaProducer<String, T> kafka) throws NoSuchFieldException, IllegalAccessException { Field senderField = KafkaProducer.class.getDeclaredField("sender"); senderField.setAccessible(true); Sender sender = (Sender) senderField.get(kafka); Field clientField = Sender.class.getDeclaredField("client"); clientField.setAccessible(true); NetworkClient networkClient = (NetworkClient) clientField.get(sender); Field networkField = NetworkClient.class.getDeclaredField("connectionStates"); networkField.setAccessible(true); return networkField.get(networkClient); } } public class CustomDnsResolver implements HostResolver { private static final Logger LOG = LogManager.getLogger(CustomDnsResolver.class); protected final String brokerIp; protected final boolean isBrokerIpConfigured; protected final String defaultBrokerHost; public CustomDnsResolver(String defaultBrokerHost, String brokerIp) { this.brokerIp = brokerIp; this.defaultBrokerHost = defaultBrokerHost; this.isBrokerIpConfigured = Strings.isNotEmpty(brokerIp); } @Override public InetAddress[] resolve(String host) throws UnknownHostException { if (isBrokerHost(host)) { LOG.info("Resolving host '{}' to IP: {}", defaultBrokerHost, brokerIp); return InetAddress.getAllByName(brokerIp); } return InetAddress.getAllByName(host); } private boolean isBrokerHost(String host) { return host.equals(defaultBrokerHost) && this.isBrokerIpConfigured; } } 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.