A simple CLI application to interact with Kafka Connect REST API. The initial idea is to perform mostly GET operations and a few PUT/POST for pause/resume connectors.
There's a custom Dockerfile in case new connectors need to be defined. A docker-compose.yaml creates the resources listed below in order to simplify testing while developing.
$ docker-compose up --build -d
Name Command State Ports ---------------------------------------------------------------------------------------------------------------------- kafka /etc/confluent/docker/run Up (healthy) 0.0.0.0:39092->39092/tcp, 9092/tcp kafka-connect bash -c echo "Installing C ... Up (healthy) 0.0.0.0:38083->8083/tcp, 9092/tcp kowl-ui /bin/sh -c sleep 10 && /ap ... Up 0.0.0.0:38080->8080/tcp postgres docker-entrypoint.sh postg ... Up (healthy) 0.0.0.0:5432->5432/tcp zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp kowl-ui: http://localhost:38080
kafka-connect: http://localhost:38083
postgres: localhost:5432 (user: kafkaconnect/ password: kafkaconnect)
The postgres container is ready to accept Kafka Connect connections since it enables logical replication and creates a publication for the records table, check out the init-user-db for more context on this topic.
The connectors directory contains a JSON payload to configure a source postgres connector using the kafkaconnect user and the records table, the schema is pretty simple
CREATE TABLE records ( role_id serial PRIMARY KEY, random_text TEXT, );You can configure postgres source connector by running:
curl -S -X POST -H "Content-Type:application/json" -d @setup/connectors-definition/postgres.json http://localhost:38083/connectors {"name":"postgres-sample-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"sample","database.hostname":"postgres","database.password":"kafkaconnect","database.port":"5432","database.server.name":"docker-postgres","database.user":"kafkaconnect","errors.log.include.messages":"true","errors.retry.delay.max.ms":"60000","errors.retry.timeout":"-1","heartbeat.action.query":"CREATE TABLE IF NOT EXISTS public.debezium_heartbeat_sample (id SERIAL PRIMARY KEY, ts TIMESTAMP WITH TIME ZONE); INSERT INTO public.debezium_heartbeat_sample (id, ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET ts=EXCLUDED.ts","heartbeat.interval.ms":"10000","hstore.handling.mode":"json","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","plugin.name":"pgoutput","signal.data.collection":"public.debezium_signal_sample","slot.name":"repl_slot_sample","table.include.list":"public.records","tasks.max":"1","topic.prefix":"postgres.","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","name":"postgres-sample-connector"},"tasks":[],"type":"source"} If the containers are in unhealthy state run docker-compose down --remove-orphans. The cleaning previous volumes may help as well.