When working with Kafka, especially in environments where the default host resolution setup might not work (e.g., when the /etc/hosts file cannot be modified), configuring DNS for brokers can be challenging. To address this, I implemented a custom DNS resolver that integrates directly into the Kafka producer. While it might not be the most elegant solution, it works effectively for both producers and consumers.
Implementation
Custom Producer with DNS Resolver
The producer uses a utility method to inject the custom DNS resolver. If enabled, it ensures the broker hostname resolves to the configured IP:
public abstract class CustomAvroProducer<T> extends AbstractKafkaClient implements CustomProducer<T> { private static final Logger LOG = LogManager.getLogger(CustomAvroProducer.class); private final List<T> records = Collections.synchronizedList(new ArrayList<>()); private final KafkaProducer<String, T> producer = new KafkaProducer<>(getProperties()); public CustomAvroProducer(KafkaTopic topic) { super(topic); configureCustomDnsResolver(); } private void configureCustomDnsResolver() { if (BROKER_DNS_ENABLE) { // Remove if need try { // DEFAULT_BROKER_HOST = kafka // BROKER_IP = 10.1.1.9 KafkaUtil.injectDnsResolverForBroker(producer, DEFAULT_BROKER_HOST, BROKER_IP); LOG.info("Custom DNS resolver activated for broker: {}", BROKER_IP); } catch (Exception e) { LOG.error("Failed to add DNS for hostname '{}', broker IP '{}'", DEFAULT_BROKER_HOST, BROKER_IP, e); } } } ... } /** * Utility class for Kafka-related operations. * This class provides methods to inject a custom DNS resolver * into the Kafka producer, ensuring the broker name is recognized * and messages are sent correctly. */ public class KafkaUtil { /** * Injects a custom DNS resolver into the Kafka producer, * configuring the broker's IP address. * * @param kafka The Kafka producer where the DNS resolver will be injected. * @param brokerDefault The default broker name to be used by the DNS resolver. * @param brokerIp The IP address of the Kafka broker. * @throws NoSuchFieldException If one of the expected fields is not found. * @throws IllegalAccessException If access to the field is not allowed. */ public static <T> void injectDnsResolverForBroker(KafkaProducer<String, T> kafka, String brokerDefault, String brokerIp) throws NoSuchFieldException, IllegalAccessException { Object connectionStates = getProducerConnectionStates(kafka); Field hostResolver = connectionStates.getClass().getDeclaredField("hostResolver"); hostResolver.setAccessible(true); hostResolver.set(connectionStates, new CustomDnsResolver(brokerDefault, brokerIp)); } private static <T> Object getProducerConnectionStates(KafkaProducer<String, T> kafka) throws NoSuchFieldException, IllegalAccessException { Field senderField = KafkaProducer.class.getDeclaredField("sender"); senderField.setAccessible(true); Sender sender = (Sender) senderField.get(kafka); Field clientField = Sender.class.getDeclaredField("client"); clientField.setAccessible(true); NetworkClient networkClient = (NetworkClient) clientField.get(sender); Field networkField = NetworkClient.class.getDeclaredField("connectionStates"); networkField.setAccessible(true); return networkField.get(networkClient); } } public class CustomDnsResolver implements HostResolver { private static final Logger LOG = LogManager.getLogger(CustomDnsResolver.class); protected final String brokerIp; protected final boolean isBrokerIpConfigured; protected final String defaultBrokerHost; public CustomDnsResolver(String defaultBrokerHost, String brokerIp) { this.brokerIp = brokerIp; this.defaultBrokerHost = defaultBrokerHost; this.isBrokerIpConfigured = Strings.isNotEmpty(brokerIp); } @Override public InetAddress[] resolve(String host) throws UnknownHostException { if (isBrokerHost(host)) { LOG.info("Resolving host '{}' to IP: {}", defaultBrokerHost, brokerIp); return InetAddress.getAllByName(brokerIp); } return InetAddress.getAllByName(host); } private boolean isBrokerHost(String host) { return host.equals(defaultBrokerHost) && this.isBrokerIpConfigured; } }
config/server.propertiesdoes have thehost.nameproperty set properly .. what is your/etc/hostsfile shows?