I'm running into an issue with apache Kafka that I don't understand . I subscribe to a topic in my broker called "topic-received" . This is the code :
protected String readResponse(final String idMessage) { if (props != null) { kafkaClient = new KafkaConsumer<>(props); logger.debug("Subscribed to topic-received"); kafkaClient.subscribe(Arrays.asList("topic-received")); logger.debug("Waiting for reading : topic-received"); ConsumerRecords<String, String> records = kafkaClient.poll(kafkaConfig.getRead_timeout()); if (records != null) { for (ConsumerRecord<String, String> record : records) { logger.debug("Resultado devuelto : "+record.value()); return record.value(); } } } return null; } As this is happening, I send a message to "topic-received" from another point . The code is the following one :
private void sendMessageToKafkaBroker(String idTopic, String value) { Producer<String, String> producer = null; try { producer = new KafkaProducer<String, String>(mapProperties()); ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("topic-received", value); producer.send(producerRecord); logger.info("Sended value "+value+" to topic-received"); } catch (ExceptionInInitializerError eix) { eix.printStackTrace(); } catch (KafkaException ke) { ke.printStackTrace(); } finally { if (producer != null) { producer.close(); } } } First time I try , with topic "topic-received", I get a warning like this
"WARN 13164 --- [nio-8085-exec-3] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {topic- received=LEADER_NOT_AVAILABLE}" But if I try again, to this topic "topic-received", works ok, and no warning is presented . Anyway, that's not useful for me, because I have to listen from a topic and send to a topic new each time ( referenced by an String identifier ex: .. 12Erw45-2345Saf-234DASDFasd )
Looking for LEADER_NOT_AVAILABLE in google , some guys talk about adding to server.properties the next lines :
host.name=127.0.0.1 advertised.port=9092 advertised.host.name=127.0.0.1 But it's not working for me ( Don't know why ) .
I have tried to create the topic before all this process with the following code:
private void createTopic(String idTopic) { String zookeeperConnect = "localhost:2181"; ZkClient zkClient = new ZkClient(zookeeperConnect,10000,10000, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect),false); if(!AdminUtils.topicExists(zkUtils,idTopic)) { AdminUtils.createTopic(zkUtils, idTopic, 2, 1, new Properties(), null); logger.debug("Created topic "+idTopic+" by super user"); } else{ logger.debug("topic "+idTopic+" already exists"); } } No error, but still, it stays listening till the timeout.
I have reviewed the properties of the broker to check if there's any help, but I haven't found anything clear enough . The props that I have used for reading are :
props = new Properties(); props.put("bootstrap.servers", kafkaConfig.getBootstrap_servers()); props.put("key.deserializer", kafkaConfig.getKey_deserializer()); props.put("value.deserializer", kafkaConfig.getValue_deserializer()); props.put("key.serializer", kafkaConfig.getKey_serializer()); props.put("value.serializer", kafkaConfig.getValue_serializer()); props.put("group.id",kafkaConfig.getGroupId()); and , for sending ...
Properties props = new Properties(); props.put("bootstrap.servers", kafkaConfig.getHost() + ":" + kafkaConfig.getPort()); props.put("group.id", kafkaConfig.getGroup_id()); props.put("enable.auto.commit", kafkaConfig.getEnable_auto_commit()); props.put("auto.commit.interval.ms", kafkaConfig.getAuto_commit_interval_ms()); props.put("session.timeout.ms", kafkaConfig.getSession_timeout_ms()); props.put("key.deserializer", kafkaConfig.getKey_deserializer()); props.put("value.deserializer", kafkaConfig.getValue_deserializer()); props.put("key.serializer", kafkaConfig.getKey_serializer()); props.put("value.serializer", kafkaConfig.getValue_serializer()); Any clue ? Why , the only way that I have to consume messages from the broker and from the topic, is repeating the request after an error ?
Thanks in advance
WARN 13164, right?