2

I setup ACL for 3 node Kafka cluster and able to send and receive for a topic through producer console and consumer console. Now I want to configure Kafka connect with ACL. I tried with SASL_PLAINTEXT combinations and in connect.log file it shows the following error. it is not syncing to from source table to topic, please help where I am missing any configuration.

error log

[2020-10-14 07:24:35,874] ERROR WorkerSourceTask{id=oracle-jdbc-source-mtx_domains_acl5-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:448) [2020-10-14 07:24:35,874] ERROR WorkerSourceTask{id=oracle-jdbc-source-mtx_domains_acl5-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCo mmitter:116)" 

My configuration as per the following files. I have mentioned users in jaas.conf file and setting into the environment.

1: zookeeper.properties.

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl zookeeper.set.acl=true jaasLoginRenew=3600000 

2: server.properties

security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=true listeners=SASL_PLAINTEXT://0.0.0.0:9092 advertised.listeners=SASL_PLAINTEXT://<server_name>:9092 host.name=server_ip 

3: schema-registry.properties

kafkastore.security.protocol=SASL_PLAINTEXT kafkastore.sasl.mechanism=PLAIN metadataServerUrls=SASL_PLAINTEXT://<server_ip>:9092 zookeeper.set.acl=true kafkastore.group.id=schema-registry-3 

4: connect-avro-distributed.properties

sasl.mechanism=PLAIN security.protocol=SASL_PLAINTEXT 

5: Source connector script

curl -X POST -H "Content-Type: application/json" --data '{ "name":"oracle-jdbc-source-mtx_domains_acl5", "config":{ "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max":"1", "connection.url":"jdbc:oracle:thin:@<ip>:<port>:<dbname>", "connection.user":"<username>", "connection.password":"password", "numeric.mapping":"best_fit", "table.whitelist":"TABLENAME", "mode":"timestamp", "timestamp.column.name":"CREATED_ON", "topic.prefix":"", "validate.non.null":"false", "transforms":"createKey", "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields":"DOMAIN_CODE", "sasl.mechanism":"PLAIN", "security.protocol":"SASL_PLAINTEXT","producer.sasl.mechanism":"PLAIN", "producer.security.protocol":"SASL_PLAINTEXT","producer.request.timeout.ms":50000, "producer.retry.backoff.ms":500, "offset.flush.timeout.ms":50000,"producer.buffer.memory":100, "sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producer\";", "producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producer\";", "key.converter.schemas.enable":"true", "value.converter.schemas.enable":"true","delete.enabled":"true","key.converter":"io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url":"http://localhost:8081", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }' http://localhost:8083/connectors 

1 Answer 1

4

You need to add the following properties to your connect-distributed.properties:

sasl.mechanism=PLAIN security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="connect" \ password="connect-secret"; producer.sasl.mechanism=PLAIN producer.security.protocol=SASL_PLAINTEXT producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="connect" \ password="connect-secret"; consumer.sasl.mechanism=PLAIN consumer.security.protocol=SASL_PLAINTEXT consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="connect" \ password="connect-secret"; 

Source: Kafka Connect Security docs

Sign up to request clarification or add additional context in comments.

2 Comments

i tried the same config but it doesn't worked. I am getting timeout exception. Timeout occurred while reading topic metadata. Could you please help here what could be the issue?
I made it to work finally. Issue was with internal topic metadata creation with acl enabled kafka. We need to provide below config when submitting a connector "schema.history.internal.consumer.sasl.jaas.config" "schema.history.internal.consumer.sasl.mechanism" "schema.history.internal.consumer.security.protocol" same for producer "schema.history.internal.producer.sasl.jaas.config" and rest of the 2 mentioned for consumer above.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.