6

I'm running into an issue with apache Kafka that I don't understand . I subscribe to a topic in my broker called "topic-received" . This is the code :

protected String readResponse(final String idMessage) { if (props != null) { kafkaClient = new KafkaConsumer<>(props); logger.debug("Subscribed to topic-received"); kafkaClient.subscribe(Arrays.asList("topic-received")); logger.debug("Waiting for reading : topic-received"); ConsumerRecords<String, String> records = kafkaClient.poll(kafkaConfig.getRead_timeout()); if (records != null) { for (ConsumerRecord<String, String> record : records) { logger.debug("Resultado devuelto : "+record.value()); return record.value(); } } } return null; } 

As this is happening, I send a message to "topic-received" from another point . The code is the following one :

private void sendMessageToKafkaBroker(String idTopic, String value) { Producer<String, String> producer = null; try { producer = new KafkaProducer<String, String>(mapProperties()); ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("topic-received", value); producer.send(producerRecord); logger.info("Sended value "+value+" to topic-received"); } catch (ExceptionInInitializerError eix) { eix.printStackTrace(); } catch (KafkaException ke) { ke.printStackTrace(); } finally { if (producer != null) { producer.close(); } } } 

First time I try , with topic "topic-received", I get a warning like this

"WARN 13164 --- [nio-8085-exec-3] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {topic- received=LEADER_NOT_AVAILABLE}" 

But if I try again, to this topic "topic-received", works ok, and no warning is presented . Anyway, that's not useful for me, because I have to listen from a topic and send to a topic new each time ( referenced by an String identifier ex: .. 12Erw45-2345Saf-234DASDFasd )

Looking for LEADER_NOT_AVAILABLE in google , some guys talk about adding to server.properties the next lines :

host.name=127.0.0.1 advertised.port=9092 advertised.host.name=127.0.0.1 

But it's not working for me ( Don't know why ) .

I have tried to create the topic before all this process with the following code:

 private void createTopic(String idTopic) { String zookeeperConnect = "localhost:2181"; ZkClient zkClient = new ZkClient(zookeeperConnect,10000,10000, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect),false); if(!AdminUtils.topicExists(zkUtils,idTopic)) { AdminUtils.createTopic(zkUtils, idTopic, 2, 1, new Properties(), null); logger.debug("Created topic "+idTopic+" by super user"); } else{ logger.debug("topic "+idTopic+" already exists"); } } 

No error, but still, it stays listening till the timeout.

I have reviewed the properties of the broker to check if there's any help, but I haven't found anything clear enough . The props that I have used for reading are :

 props = new Properties(); props.put("bootstrap.servers", kafkaConfig.getBootstrap_servers()); props.put("key.deserializer", kafkaConfig.getKey_deserializer()); props.put("value.deserializer", kafkaConfig.getValue_deserializer()); props.put("key.serializer", kafkaConfig.getKey_serializer()); props.put("value.serializer", kafkaConfig.getValue_serializer()); props.put("group.id",kafkaConfig.getGroupId()); 

and , for sending ...

 Properties props = new Properties(); props.put("bootstrap.servers", kafkaConfig.getHost() + ":" + kafkaConfig.getPort()); props.put("group.id", kafkaConfig.getGroup_id()); props.put("enable.auto.commit", kafkaConfig.getEnable_auto_commit()); props.put("auto.commit.interval.ms", kafkaConfig.getAuto_commit_interval_ms()); props.put("session.timeout.ms", kafkaConfig.getSession_timeout_ms()); props.put("key.deserializer", kafkaConfig.getKey_deserializer()); props.put("value.deserializer", kafkaConfig.getValue_deserializer()); props.put("key.serializer", kafkaConfig.getKey_serializer()); props.put("value.serializer", kafkaConfig.getValue_serializer()); 

Any clue ? Why , the only way that I have to consume messages from the broker and from the topic, is repeating the request after an error ?

Thanks in advance

4
  • Consuming the message from windows console , I get a kafka.common.NotLeaderForPartitionException Commented Sep 27, 2016 at 10:59
  • I am not sure, what the exact problem is... Furthermore, I guess it's not an error, but just a warning: WARN 13164, right? Commented Sep 28, 2016 at 0:34
  • Yes . it's not exactly an error . It's a warning , but the consumer stands without reading any message . On the other hand, when I try to consume the broker from the console, I get kafka.common.NotLeaderForPartitionException . If I use a well stablished topic , I have no error, and this warning doesn't appear . Commented Sep 28, 2016 at 14:01
  • As the warning implies Kafka has not elected a leader yet. This is why you say there is no error in a well-established topic. Create the topic, then use the tool kafka-topics.sh with --describe option. If you see a leader in the output, you won't get any warnings. Commented Nov 29, 2016 at 11:35

2 Answers 2

15

This happens when trying to produce messages to a topic that doesn't exist

PLEASE NOTE: In some Kafka installations, the framework can automatically create the topic when it doesn't exist, that explains why you see the issue only once at the very beginning.

Sign up to request clarification or add additional context in comments.

Comments

1

This error appears when your Topic name doesn't exist.

To list all topics execute following command:

kafka-topics --list --zookeeper localhost:2181 

1 Comment

topic does exist in my case

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.