Building stream processing applications with Apache Kafka® using KSQL @rmoff #KafkaSummit
PROCESSING STREAM
PROCESSING STREAM
PROCESSING STREAM a of EVENTS
EVERYWHERE of EVENTS STREAMS ARE @rmoff
Building stream processing applications for Apache Kafka using KSQL @rmoff A Customer Experience
Building stream processing applications for Apache Kafka using KSQL @rmoff A Sale
Building stream processing applications for Apache Kafka using KSQL @rmoff A Sensor Reading
Building stream processing applications for Apache Kafka using KSQL @rmoff An Application Log Entry
Building stream processing applications for Apache Kafka using KSQL @rmoff Databases
Building stream processing applications for Apache Kafka using KSQL @rmoff Immutable event log
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Streams of events Time
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Stream Processing with KSQL Stream: widgets Stream: widgets_red
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Stream Processing with KSQL Stream: widgets Stream: widgets_red CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour='RED';
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Stream Processing with KSQL Source stream
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Stream Processing with KSQL Source stream
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Stream Processing with KSQL Source stream Analytics Applications / Microservices
@rmoff #KafkaSummit Building stream processing applications for Apache Kafka using KSQL KSQL in action 🚀https://rmoff.dev/kssf19-ksql-code
Building stream processing applications for Apache Kafka using KSQL @rmoff
Building stream processing applications for Apache Kafka using KSQL @rmoff https://rmoff.dev/kssf19-ksql-code
DEMO https://rmoff.dev/kssf19-ksql-code
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Code! https://rmoff.dev/kssf19-ksql-code
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit MQTT + Kafka + KSQL + Elastic = ❤
Building stream processing applications for Apache Kafka using KSQL @rmoff
Building stream processing applications for Apache Kafka using KSQL @rmoff
Building stream processing applications for Apache Kafka using KSQL @rmoff http://confluent.cloud/signup
@rmoff #KafkaSummit Building stream processing applications for Apache Kafka using KSQL Interacting with KSQL 📬
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit KSQL - Confluent Control Center
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit KSQL - CLI
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit KSQL - REST API
@rmoff #KafkaSummit Building stream processing applications for Apache Kafka using KSQL KSQL operations and deployment 💾
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Interactive KSQL
 for development and testing Headless KSQL
 for Production Desired KSQL queries have been identified REST “Hmm, let me try
 out this idea...” KSQL in Development and Production
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit How to run KSQL KSQL Server (JVM process) …and many more… DEB, RPM, ZIP, TAR downloads http://confluent.io/ksql Docker images confluentinc/cp-ksql-server confluentinc/cp-ksql-cli
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Think Applications, not database instances
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Monitoring KSQL https://www.confluent.io/blog/troubleshooting-ksql-part-2 Confluent Control Center JMX
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit http://cnfl.io/book-bundle
#EOFhttps://talks.rmoff.net @rmoff #KafkaSummit 💬 Join the Confluent Community Slack group at http://cnfl.io/slack
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit •The Changing Face of ETL: 
 Event-Driven Architectures for Data Engineers • 📖 Slides • 📽 Recording •ATM Fraud detection with Kafka and KSQL • 📖 Slides • 👾 Code • 📽 Recording •Embrace the Anarchy: Apache Kafka's Role in Modern Data Architectures • 📖 Slides • 📽 Recording •Apache Kafka and KSQL in Action : Let’s Build a Streaming Data Pipeline! • 📖 Slides • 👾 Code • 📽 Recording •No More Silos: Integrating Databases and Apache Kafka • 📖 Slides • 👾 Code (MySQL) • 👾 Code (Oracle) • 📽 Recording Related Talks
Bonus content!
@rmoff #KafkaSummit Building stream processing applications for Apache Kafka using KSQL KSQL in action 🚀
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Filtering with KSQL ORDERS
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Filtering with KSQL ORDERS CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS->STATE='New York'; KSQL
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Filtering with KSQL ORDERS ORDERS_NY CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS->STATE='New York'; KSQL
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } }
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } } KSQL CREATE STREAM ORDERS_NO_ADDRESS_DATA AS SELECT ORDERTIME, ORDERID, ITEMID, ORDERUNITS FROM ORDERS;
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS ORDERS_NO_ADDRESS_DATA { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } } { "order_ts": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5 } KSQL CREATE STREAM ORDERS_NO_ADDRESS_DATA AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP, ORDERID, ITEMID, ORDERUNITS FROM ORDERS;
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } }
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS CREATE STREAM ORDERS_FLAT AS SELECT […] ADDRESS->STREET AS ADDRESS_STREET, ADDRESS->CITY AS ADDRESS_CITY, ADDRESS->STATE AS ADDRESS_STATE FROM ORDERS; { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } } KSQL
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS ORDERS_FLAT CREATE STREAM ORDERS_FLAT AS SELECT […] ADDRESS->STREET AS ADDRESS_STREET, ADDRESS->CITY AS ADDRESS_CITY, ADDRESS->STATE AS ADDRESS_STATE FROM ORDERS; {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address-street": "243 Utah Way", "address-city": "Orange", "address-state": "California"} { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } } KSQL
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Reserialising data with KSQL ORDERS {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address-street": "243 Utah Way", "address-city": "Orange", "address-state": "California"}
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Reserialising data with KSQL ORDERS CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT='DELIMITED') AS SELECT * FROM ORDERS_FLAT; {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address-street": "243 Utah Way", "address-city": "Orange", "address-state": "California"} KSQL
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Reserialising data with KSQL ORDERS ORDERS_CSV CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT='DELIMITED) AS SELECT * FROM ORDERS_FLAT; {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address-street": "243 Utah Way", "address-city": "Orange", "address-state": "California"} 1560045914101,24644,Item_0,1,43078 De 1560047305664,24643,Item_29,3,209 Mon 1560057079799,24642,Item_38,18,3 Autu 1560088652051,24647,Item_6,6,82893 Ar 1560105559145,24648,Item_0,12,45896 W 1560108336441,24646,Item_33,4,272 Hef 1560123862235,24641,Item_15,16,0 Dort 1560124799053,24645,Item_12,1,71 Knut KSQL
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Lookups and Joins with KSQL ORDERS {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5}
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Lookups and Joins with KSQL ORDERS {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5} ITEMS { "id": "Item_9", "make": "Boyle-McDermott", "model": "Apiaceae", "unit_cost": 19.9 }
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Lookups and Joins with KSQL ORDERS CREATE STREAM ORDERS_ENRICHED AS SELECT O.*, I.*, O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5} ITEMS { "id": "Item_9", "make": "Boyle-McDermott", "model": "Apiaceae", "unit_cost": 19.9 } KSQL
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Lookups and Joins with KSQL ORDERS ORDERS_ENRICHED CREATE STREAM ORDERS_ENRICHED AS SELECT O.*, I.*, O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5} ITEMS { "id": "Item_9", "make": "Boyle-McDermott", "model": "Apiaceae", "unit_cost": 19.9 } { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "make": "Boyle-McDermott", "model": "Apiaceae", "unit_cost": 19.9, "total_order_value": 99.5 } KSQL
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Connecting to other systems with Kafka Connect CREATE STREAM ORDERS_ENRICHED AS SELECT […] FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; KSQL Kafka Connect
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Stateful Aggregation with KSQL ORDERS
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Stateful Aggregation with KSQL SELECT MAKE, COUNT(*) AS ORDER_COUNT FROM ORDERS_ENRICHED GROUP BY MAKE; ORDERS
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Stateful Aggregation with KSQL SELECT MAKE, COUNT(*) AS ORDER_COUNT FROM ORDERS_ENRICHED GROUP BY MAKE; ORDERS
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - merge streams ORDERS ORDERS_UKUS US UK UK
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - merge streams ORDERS ORDERS_UK INSERT INTO ORDERS_COMBINED SELECT 'US' AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS; INSERT INTO ORDERS_COMBINED SELECT 'UK' AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS_UK; US US UK UK
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - merge streams ORDERS ORDERS_UK ORDERS_COMBINED INSERT INTO ORDERS_COMBINED SELECT 'UK' AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS_UK; INSERT INTO ORDERS_COMBINED SELECT 'US' AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS; US US UK UK USUS UK UK
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - split streams ORDERS_COMBINED US US UKUK
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - split streams ORDERS_COMBINED CREATE STREAM ORDERS_US AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE ='US'; CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE ='UK'; US US UKUK
Building stream processing applications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - split streams ORDERS_COMBINED CREATE STREAM ORDERS_US AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE ='US'; CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE ='UK'; ORDERS_US ORDERS_UK US US UKUK USUS UK UK

Building Stream Processing Applications with Apache Kafka Using KSQL (Robin Moffatt, Confluent) Kafka Summit SF 2019

  • 1.
    Building stream processingapplications with Apache Kafka® using KSQL @rmoff #KafkaSummit
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff A Customer Experience
  • 7.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff A Sale
  • 8.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff A Sensor Reading
  • 9.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff An Application Log Entry
  • 10.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff Databases
  • 11.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff Immutable event log
  • 12.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Streams of events Time
  • 13.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Stream Processing with KSQL Stream: widgets Stream: widgets_red
  • 14.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Stream Processing with KSQL Stream: widgets Stream: widgets_red CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour='RED';
  • 15.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Stream Processing with KSQL Source stream
  • 16.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Stream Processing with KSQL Source stream
  • 17.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Stream Processing with KSQL Source stream Analytics Applications / Microservices
  • 18.
    @rmoff #KafkaSummit Building streamprocessing applications for Apache Kafka using KSQL KSQL in action 🚀https://rmoff.dev/kssf19-ksql-code
  • 20.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff
  • 21.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff https://rmoff.dev/kssf19-ksql-code
  • 22.
  • 23.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Code! https://rmoff.dev/kssf19-ksql-code
  • 24.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit MQTT + Kafka + KSQL + Elastic = ❤
  • 25.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff
  • 26.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff
  • 27.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff http://confluent.cloud/signup
  • 28.
    @rmoff #KafkaSummit Building streamprocessing applications for Apache Kafka using KSQL Interacting with KSQL 📬
  • 29.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit KSQL - Confluent Control Center
  • 30.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit KSQL - CLI
  • 31.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit KSQL - REST API
  • 32.
    @rmoff #KafkaSummit Building streamprocessing applications for Apache Kafka using KSQL KSQL operations and deployment 💾
  • 33.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Interactive KSQL
 for development and testing Headless KSQL
 for Production Desired KSQL queries have been identified REST “Hmm, let me try
 out this idea...” KSQL in Development and Production
  • 34.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit How to run KSQL KSQL Server (JVM process) …and many more… DEB, RPM, ZIP, TAR downloads http://confluent.io/ksql Docker images confluentinc/cp-ksql-server confluentinc/cp-ksql-cli
  • 36.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Think Applications, not database instances
  • 37.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Monitoring KSQL https://www.confluent.io/blog/troubleshooting-ksql-part-2 Confluent Control Center JMX
  • 38.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit http://cnfl.io/book-bundle
  • 39.
    #EOFhttps://talks.rmoff.net @rmoff #KafkaSummit 💬 Jointhe Confluent Community Slack group at http://cnfl.io/slack
  • 40.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit •The Changing Face of ETL: 
 Event-Driven Architectures for Data Engineers • 📖 Slides • 📽 Recording •ATM Fraud detection with Kafka and KSQL • 📖 Slides • 👾 Code • 📽 Recording •Embrace the Anarchy: Apache Kafka's Role in Modern Data Architectures • 📖 Slides • 📽 Recording •Apache Kafka and KSQL in Action : Let’s Build a Streaming Data Pipeline! • 📖 Slides • 👾 Code • 📽 Recording •No More Silos: Integrating Databases and Apache Kafka • 📖 Slides • 👾 Code (MySQL) • 👾 Code (Oracle) • 📽 Recording Related Talks
  • 41.
  • 42.
    @rmoff #KafkaSummit Building streamprocessing applications for Apache Kafka using KSQL KSQL in action 🚀
  • 43.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Filtering with KSQL ORDERS
  • 44.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Filtering with KSQL ORDERS CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS->STATE='New York'; KSQL
  • 45.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Filtering with KSQL ORDERS ORDERS_NY CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS->STATE='New York'; KSQL
  • 46.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } }
  • 47.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } } KSQL CREATE STREAM ORDERS_NO_ADDRESS_DATA AS SELECT ORDERTIME, ORDERID, ITEMID, ORDERUNITS FROM ORDERS;
  • 48.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS ORDERS_NO_ADDRESS_DATA { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } } { "order_ts": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5 } KSQL CREATE STREAM ORDERS_NO_ADDRESS_DATA AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP, ORDERID, ITEMID, ORDERUNITS FROM ORDERS;
  • 49.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } }
  • 50.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS CREATE STREAM ORDERS_FLAT AS SELECT […] ADDRESS->STREET AS ADDRESS_STREET, ADDRESS->CITY AS ADDRESS_CITY, ADDRESS->STATE AS ADDRESS_STATE FROM ORDERS; { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } } KSQL
  • 51.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Schema manipulation with KSQL ORDERS ORDERS_FLAT CREATE STREAM ORDERS_FLAT AS SELECT […] ADDRESS->STREET AS ADDRESS_STREET, ADDRESS->CITY AS ADDRESS_CITY, ADDRESS->STATE AS ADDRESS_STATE FROM ORDERS; {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address-street": "243 Utah Way", "address-city": "Orange", "address-state": "California"} { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address": { "street": "243 Utah Way", "city": "Orange", "state": "California" } } KSQL
  • 52.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Reserialising data with KSQL ORDERS {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address-street": "243 Utah Way", "address-city": "Orange", "address-state": "California"}
  • 53.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Reserialising data with KSQL ORDERS CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT='DELIMITED') AS SELECT * FROM ORDERS_FLAT; {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address-street": "243 Utah Way", "address-city": "Orange", "address-state": "California"} KSQL
  • 54.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Reserialising data with KSQL ORDERS ORDERS_CSV CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT='DELIMITED) AS SELECT * FROM ORDERS_FLAT; {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "address-street": "243 Utah Way", "address-city": "Orange", "address-state": "California"} 1560045914101,24644,Item_0,1,43078 De 1560047305664,24643,Item_29,3,209 Mon 1560057079799,24642,Item_38,18,3 Autu 1560088652051,24647,Item_6,6,82893 Ar 1560105559145,24648,Item_0,12,45896 W 1560108336441,24646,Item_33,4,272 Hef 1560123862235,24641,Item_15,16,0 Dort 1560124799053,24645,Item_12,1,71 Knut KSQL
  • 55.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Lookups and Joins with KSQL ORDERS {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5}
  • 56.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Lookups and Joins with KSQL ORDERS {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5} ITEMS { "id": "Item_9", "make": "Boyle-McDermott", "model": "Apiaceae", "unit_cost": 19.9 }
  • 57.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Lookups and Joins with KSQL ORDERS CREATE STREAM ORDERS_ENRICHED AS SELECT O.*, I.*, O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5} ITEMS { "id": "Item_9", "make": "Boyle-McDermott", "model": "Apiaceae", "unit_cost": 19.9 } KSQL
  • 58.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Lookups and Joins with KSQL ORDERS ORDERS_ENRICHED CREATE STREAM ORDERS_ENRICHED AS SELECT O.*, I.*, O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; {"ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5} ITEMS { "id": "Item_9", "make": "Boyle-McDermott", "model": "Apiaceae", "unit_cost": 19.9 } { "ordertime": 1560070133853, "orderid": 67, "itemid": "Item_9", "orderunits": 5, "make": "Boyle-McDermott", "model": "Apiaceae", "unit_cost": 19.9, "total_order_value": 99.5 } KSQL
  • 59.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Connecting to other systems with Kafka Connect CREATE STREAM ORDERS_ENRICHED AS SELECT […] FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; KSQL Kafka Connect
  • 60.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Stateful Aggregation with KSQL ORDERS
  • 61.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Stateful Aggregation with KSQL SELECT MAKE, COUNT(*) AS ORDER_COUNT FROM ORDERS_ENRICHED GROUP BY MAKE; ORDERS
  • 62.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Stateful Aggregation with KSQL SELECT MAKE, COUNT(*) AS ORDER_COUNT FROM ORDERS_ENRICHED GROUP BY MAKE; ORDERS
  • 63.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - merge streams ORDERS ORDERS_UKUS US UK UK
  • 64.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - merge streams ORDERS ORDERS_UK INSERT INTO ORDERS_COMBINED SELECT 'US' AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS; INSERT INTO ORDERS_COMBINED SELECT 'UK' AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS_UK; US US UK UK
  • 65.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - merge streams ORDERS ORDERS_UK ORDERS_COMBINED INSERT INTO ORDERS_COMBINED SELECT 'UK' AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS_UK; INSERT INTO ORDERS_COMBINED SELECT 'US' AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS; US US UK UK USUS UK UK
  • 66.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - split streams ORDERS_COMBINED US US UKUK
  • 67.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - split streams ORDERS_COMBINED CREATE STREAM ORDERS_US AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE ='US'; CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE ='UK'; US US UKUK
  • 68.
    Building stream processingapplications for Apache Kafka using KSQL @rmoff #KafkaSummit Transform data with KSQL - split streams ORDERS_COMBINED CREATE STREAM ORDERS_US AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE ='US'; CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE ='UK'; ORDERS_US ORDERS_UK US US UKUK USUS UK UK