This repo demostrate publishing DB change in PostgreSQL to Reactjs frontend via Kafka Connect and MQTT:
Assume that you have install docker compose, curl and jq, and then run ./run.sh, that's it.
The run.sh did the following:
- Start the infrastructure defined in docker-compose.yml.
- Wait until the ksqlDB was ready, then run the init.sql to create kStream, kTable and Connectors.
- Create the Redis Search index with ft.create.
- Run the updateOdds.sh to generate random change in odds table
Launch the odds table react (http://localhost:3000/odds/20240209/1)
To clear up the docker runtime:
docker compose downor
docker rm -f `docker ps -a -q`Start the docker compose:
docker compose up -d --build- kafka broker and controller server configuration reference
- Kafka Connect Configurations for Confluent Platform
- ksqlDB server configuration reference
- Docker image - redis-stack for RedisJSON
Create the connector with kafka-connect REST API
curl -i -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "my-postgres-source-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:postgresql://postgres-db/db1", "connection.user": "admin", "connection.password": "passwd", "table.types": "TABLE,VIEW", "table.whitelist": "race,v_race_horse,odds_forecast", "mode": "timestamp", "timestamp.column.name": "lastupd", "validate.non.null": "false", "poll.interval.ms": 2000, "topic.prefix": "postgres_src_", "transforms": "ValueToKey,ExtractValue", "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.ValueToKey.fields": "id", "transforms.ExtractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.ExtractValue.field": "id", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081" } }'Check the connector status after created
curl localhost:8083/connectors/my-postgres-source-connector/statusOr, check the connector configuration
curl localhost:8083/connectors/my-postgres-source-connector/configOr, delete the connector if necessary
curl -i -X DELETE http://localhost:8083/connectors/my-postgres-source-connector/List all installed connectors
curl localhost:8083/connectors?expand=status&expand=info- Kafka Connect Deep Dive - Converters and Serialization Explained
- Kafka Connect Self-managed Connectors for Confluent Platform
- Kafka Connect Single Message Transforms for Confluent
- Kafka Connect - How to use Single Message Transforms in Kafka Connect
- Debezium - open source change data capture project
- JDBC Source Connector for Confluent Platform
- JDBC Source Connector Configuration Properties
- PostgresSQL Source (JDBC) Connector for Confluent Cloud
Run the ksql-cli embedded into the ksqlDB
docker exec -it ksqldb /bin/ksql http://localhost:8088 docker exec -it ksqldb /bin/ksql --file /home/appuser/init-sql/init.sql http://localhost:8088Create kStream odds from postgres DB odds_forecast table
CREATE STREAM odds ( id string key, race_id string, first_leg decimal(2,0), second_leg decimal(2,0), odds decimal(5,3), sts string, ver int, lastUpd timestamp ) WITH ( kafka_topic = 'postgres_src_odds_forecast', value_format = 'AVRO' );Create kTable race from postgres DB races table
CREATE TABLE race ( id string primary key, race_date date, race_no decimal(2,0), race_time time, racecourse string, ver int, lastUpd timestamp ) WITH ( kafka_topic = 'postgres_src_race', value_format = 'AVRO' );CREATE TABLE race_horse_tbl ( id String primary key, race_date date, race_no decimal(2,0), draw decimal(2,0), horse string, jockey string, ver int, lastUpd timestamp ) WITH ( KAFKA_TOPIC = 'postgres_src_v_race_horse', VALUE_FORMAT = 'AVRO' );Create a KStream odds_forecast by joining both odds KStrean and race KTable
CREATE OR REPLACE STREAM odds_forecast WITH ( KAFKA_TOPIC = 'all_odds', VALUE_FORMAT = 'AVRO' ) AS SELECT o.id as odds_id, o.race_id, r.race_date, r.race_time, cast(r.race_no as int) as race_no, r.racecourse, cast(o.first_leg as int) as first_leg, cast(o.second_leg as int) as second_leg, 'forecast' as type, cast(o.first_leg as varchar)+ '-' + cast(o.second_leg as varchar) as pattern, cast(o.odds as double) as odds, o.sts, o.ver as ver, o.lastUpd as lastupd, 'odds/forecast/' + format_date(r.race_date,'yyyyMMdd') + '/' + cast(r.race_no as varchar) as mqtt_topic FROM odds o INNER JOIN race r on o.race_id = r.id PARTITION BY o.id EMIT CHANGES;Create another KStream on top of odds_forecast, which value is serialized as JSON instead of AVRO. It is for MQTT sink connector which does not suppor AVRO serialization.
CREATE OR REPLACE STREAM odds_json WITH ( KAFKA_TOPIC = 'all_odds_json', VALUE_FORMAT = 'JSON' ) AS SELECT * FROM odds_forecast EMIT CHANGES;CREATE OR REPLACE TABLE race_horse WITH ( KAFKA_TOPIC = 'race_horse', VALUE_FORMAT = 'JSON' ) AS SELECT * FROM race_horse_tbl EMIT CHANGES;Other commands for ksqlDB, list different objects in ksqlDB
show connectors; show topics; show streams; show tables; describe odds_forecast extended;Operations related to Query
show queries; pause all; resume all; terminate all;Set the query to read from the beginning of the topic
SET 'auto.offset.reset' = 'earliest';Create redis-sink-connector with kafka-connect REST API
curl -i -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "my-redis-sink-arvo", "config": { "connector.class": "com.redis.kafka.connect.RedisSinkConnector", "tasks.max": "1", "topics": "all_odds", "redis.uri": "redis://redis:6379", "redis.key": "${topic}", "redis.command": "JSONSET", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schemas.enable": "true", "value.converter.schema.registry.url": "http://schema-registry:8081" } }'curl -i -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "my-redis-sink-json", "config": { "connector.class": "com.redis.kafka.connect.RedisSinkConnector", "tasks.max": "1", "topics": "race_horse", "redis.uri": "redis://redis:6379", "redis.key": "${topic}", "redis.command": "JSONSET", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter" } }'Check connector status and configuration
curl localhost:8083/connectors/my-redis-sink-avro/status curl localhost:8083/connectors/my-redis-sink-json/statuscurl localhost:8083/connectors/my-redis-sink-avro/config curl localhost:8083/connectors/my-redis-sink-json/statusDelete connector
curl -i -X DELETE http://localhost:8083/connectors/my-redis-sink-avro/ curl -i -X DELETE http://localhost:8083/connectors/my-redis-sink-json/Tips!!! convert ksqlDB date to redis epoch year value
Run the redis-cli embedded into the redis docker image
docker exec -it redis redis-cli docker exec -it redis redis-cli keys *Create redisSearch index (Tips!!! the JSON properties defined in ft.create statement is case-sensitive)
ft.create odds PREFIX 1 all_odds: on json schema $.RACE_DATE as race_date numeric $.RACE_NO as race_no numeric ft.create horse PREFIX 1 race_horse: on json schema $.RACE_DATE as race_date numeric $.RACE_NO as race_no numeric $.DRAW as draw numeric sortableSearch queries for testing the index
ft.search odds '@no:(1)' ft.search odds '@pattern:(1-2)' ft.search odds '@venue:(Sandown)' ft.search odds '(@race_date:[19762 19762] @race_no:[1 1])'Operations for redisSearch index: List all, show info, delete
ft._list ft.info odds ft.dropindex odds ft.dropindex horseOther useful redis commands
keys * json.get postgres_src_odds_forecast:bef34f7f-d784-4995-ac82-e4840902b9a1 $ json.get all_odds:e3dcd46b-9d55-435d-9cb5-c0198be9a211 $- Redis Search - Query data
- Redis command - ft.create
- Redis command - ft.search
- Redis command - ft.explain
- Redis command - json.get
- Redis command - json.set
Create mqtt-sink connector (Tips!!! kafka-mqtt-sink-connector does not support AVRO serialization)
curl -i -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "my-mqtt-sink", "config": { "connector.class": "io.confluent.connect.mqtt.MqttSinkConnector", "tasks.max": "1", "topics": "all_odds_json", "mqtt.server.uri": "tcp://hivemq:1883", "mqtt.qos": "2", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "transforms": "extractTopic", "transforms.extractTopic.type": "io.confluent.connect.transforms.ExtractTopic$Key", "transforms.extractTopic.field": "mqtt_topic", "confluent.topic.bootstrap.servers": "kafka-broker:29092", "confluent.topic.replication.factor": "1" } }'Check connector status and configuration
curl localhost:8083/connectors/my-mqtt-sink/status curl localhost:8083/connectors/my-mqtt-sink/configDelete connector
curl -i -X DELETE http://localhost:8083/connectors/my-mqtt-sink/Run HiveMQ-cli with with docker imae
docker run -it \ --rm --name=hivemq-cli2 \ --network publish-msg-with-kafka-connect-mqtt-nextjs_default --link hivemq \ hivemq/mqtt-cli shellCheck server config file in HiveMQ
docker exec -it hivemq cat conf/config.xmlConnect to hiveMQ broker
connect --host=hivemq --port=1883Subscribe the all_odds topic and waiting for the messages
sub -t all_odds_json --stay --jsonOutputPublish to all_odd topic if necessary
pub -t all_odds_json -m 'Try Me!!'Run psql embedded in the postgres db server
docker exec -it postgres-db psql --host=localhost --username=admin --dbname=db1update all odds with random odds
update odds_forecast set odds = random()*100, ver = ver + 1, lastupd=current_timestamp; update odds_forecast set odds = random()*100, ver = ver + 1, lastupd=current_timestamp where race_id = (select id from race where race_date = '2024-02-09' and race_no = 1); update odds_forecast set odds = random()*100, ver = ver + 1, lastupd=current_timestamp where first_leg = 2 and second_leg = 1 and race_id = (select id from race where race_date = '2024-02-09' and race_no = 1); update race_horse_jockey set ver = ver + 1, lastupd=current_timestamp; select r.race_date, r.race_no, o.first_leg, o.second_leg, o.odds, o.ver, o.lastupd from odds_forecast o join race r on r.id = o.race_id where r.race_date = '2024-02-09' and race_no = 1 order by lastupd desc limit 5;Subsrcibe to HiveMQ "all_odds_json" topic in mqtt(tcp) protocol with MQTT.js package
npx mqtt sub -t 'all_odds_json' -h 'localhost' -p '1883' -l 'mqtt' -i 'mqttjs-client-1' -vnpx mqtt sub -t 'odds-forecast-20240209-1-json' -h 'localhost' -p '1883' -l 'mqtt' -i 'mqttjs-client-1' -vSubsrcibe to HiveMQ "all_odds_json" topic in ws(websocket) protocol with MQTT.js package (not working)
npx mqtt sub -t 'all_odds_json' -h 'localhost/mqtt' -p '8000' -l 'ws' -i 'mqttjs-client-1' -v- (http://192.168.19.130:3000/api/horse/20240210/1)
- (http://192.168.19.130:3000/api/odds/20240210/1)
- (http://192.168.19.130:3000/odds/20240209/1)
- Confluent Inc - demo-scene - building-a-stream-pipeline
- Confluent Inc - demo-scene - kafka-connect-zero-to-hero
- Robin Moffatt - From Zero to Hero with Kafka Connect
- Robin Moffatt - Apache Kafka and ksqlDB in Action: Let's Build a Streaming Data Pipeline
- Robin Moffatt - Twelve Days of SMT - Day 2:ValueToKey and ExtractField
- ksqlDB and Stream Processing Tutorials|ksqlDB 101
- ksqlDB & Advanced Stream Processing Tutorials|Inside ksqlDB

