0

I am having a large cluster of Confluent Kafka comprising of multiple sub-clusters One for Zookeeper, another for Kafka broker with Schema Registry and KSQL streams And one cluster for Connect.

My connect cluster is having issues since I have configured the rest.advertised.host.name for all of my worker instances to FQDN as per the article here -

Following are the errors I am continously seeing in the connect distributed log files on all nodes -

connectDistributed.out

Error 1-

[2021-08-12 14:07:48,932] INFO [Consumer clientId=connector-consumer-XYZ-0, groupId=connect-XYZ] Attempt to heartbeat failed since group is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1054) 

Error 2-

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-coordinator-heartbeat-thread | connect-XYZ" 

Following are the connect worker properties -

bootstrap.servers=production-kafka-elb.int.supportabc.platform.co.uk:9092 group.id=connect-cluster-cc connect.protocol=compatible key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets-cc offset.storage.replication.factor=5 config.storage.topic=connect-configs-cc config.storage.replication.factor=5 status.storage.topic=connect-status-cc status.storage.replication.factor=5 offset.flush.interval.ms=10000 rest.port=8085 rest.advertised.host.name=bblpkaa011.int.supportabc.platform.co.uk rest.advertised.port=8085 plugin.path=/usr/share/java,/apps/confluent-5.5.1/share/java/ key.converter.schema.registry.url=abc-production-kafka-elb.int.supportabc.platform.co.uk:8081 value.converter.schema.registry.url=abc-production-kafka-elb.int.supportabc.platform.co.uk:8081 

I am sure each worker has 6GB assigned to it -

See the process trace -

java -Xmx6G -Xms6G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:MetaspaceSize=96m -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/apps/confluent-5.5.1/bin/../logs -Dlog4j.configuration=file:/apps/confluent-5.5.1/bin/../etc/kafka/connect-log4j.properties -cp /apps/confluent-5.5.1/share/java/confluent-security/connect/*:/apps/confluent-5.5.1/share/java/kafka/*:/apps/confluent-5.5.1/share/java/confluent-common/*:/apps/confluent-5.5.1/share/java/kafka-serde-tools/*:/apps/confluent-5.5.1/share/java/monitoring-interceptors/*:/apps/confluent-5.5.1/bin/../ce-broker-plugins/build/libs/*:/apps/confluent-5.5.1/bin/../ce-broker-plugins/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../ce-auth-providers/build/libs/*:/apps/confluent-5.5.1/bin/../ce-auth-providers/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../ce-rest-server/build/libs/*:/apps/confluent-5.5.1/bin/../ce-rest-server/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../ce-audit/build/libs/*:/apps/confluent-5.5.1/bin/../ce-audit/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../share/java/kafka/*:/apps/confluent-5.5.1/bin/../share/java/confluent-metadata-service/*:/apps/confluent-5.5.1/bin/../share/java/rest-utils/*:/apps/confluent-5.5.1/bin/../share/java/confluent-common/*:/apps/confluent-5.5.1/bin/../share/java/confluent-security/schema-validator/*:/apps/confluent-5.5.1/bin/../support-metrics-client/build/dependant-libs-2.12.10/*:/apps/confluent-5.5.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/*:/apps/confluent-5.5.1/bin/../support-metrics-fullcollector/build/dependant-libs-2.12.10/*:/apps/confluent-5.5.1/bin/../support-metrics-fullcollector/build/libs/*:/usr/share/java/support-metrics-fullcollector/* -javaagent:/apps/ad/java-agent-20.9.0.30985-latest/javaagent.jar org.apache.kafka.connect.cli.ConnectDistributed /apps/confluent-5.5.1/etc/kafka/connect-distributed-worker-cc.properties 

Please help how to solve this issue?

2 Answers 2

1

What actually happened!

On the Connect cluster this appears when all nodes on the cluster have entered into what Confluent calls a 'STOP-THE-WORLD' event of heavy re-balancing.

Which essentially means no matter how many connector workers / tasks were running on the cluster before, they all stop processing whatever they were before and jump into re-balancing mode fighting for a leader.

Why it happened!

One of your Connect worker properties file is set to this -> connect.protocol=compatible

OR

Some other major change in the connect worker properties or worker restart without pausing the running tasks first

Solution

rest.advertised.host.name=<FULLY QUALIFIED HOST NAME> OR <IP.ADDRESS> rest.advertised.port=8083 

I have been able to solve this by following below steps in the order mentioned below -

  1. Stopped the Connect worker running with connect.protocol=compatible

  2. Stopped other Connect workers

  3. Added two properties in all the worker properties file -rest.advertised.host.name= -rest.advertised.port=

  4. Restarted the Connect workers one by one and noticed the below property being picked up

[kafka@abchostnamekk01 logs]$ grep -ri 'info advertised' connectDistributed.out [2021-08-12 14:06:50,809] INFO Advertised URI: http://abchostnamekk01.domain.com:8083 
Sign up to request clarification or add additional context in comments.

Comments

0

The right answer to get rid of Out of Memory errors is to increase the Xms and Xmx memory allocation variables in the process config and restart it gracefully.

Check if the existing process has a variable like shown below

./connect-distributed: export KAFKA_HEAP_OPTS="-Xms6G -Xmx6G" 

Check the output of free -m or top on the target server

KiB Mem : 32304516 total, 288648 free, 17298612 used, 

Change the memory allocation limits as per the available memory in the system

./connect-distributed:export KAFKA_HEAP_OPTS="-Xmx28G -Xms24G" 

Gracefully stop the process on the console using flag below. If -SIGTERM doesn't work use -SIGKILL

kill -SIGTERM <PID> 

Restart the service using the usual restart command

/apps/confluent-5.5.1/bin/connect-distributed -daemon /apps/confluent-5.5.1/etc/kafka/connect-distributed-worker1.properties 

After this restart everything should stabilize

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.