Introduction to Apache Kafka Shiao-An Yuan @sayuan 2017-12-13
Kafka Producer Kafka Overview Consumer .. 8 7 6 5 4 3 2 1 0 push pull ZooKeeper ● Developed by LinkedIn ● Written in Scala ● Message queues Topic
Why We Need Message Queues? ● Buffering ● Decoupling
https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
Use Cases ● Messaging ● Website Activity Tracking ● Metrics ● Log Aggregation ● Stream Processing ● Event Sourcing ● Commit Log
Features ● Durability ● Scalability ● Publish-subscribe ● Ordering ● High Availability ● High Throughput ● Delivery Semantics
Kafka Producer Let’s Add These Features One by One Consumer .. 8 7 6 5 4 3 2 1 0 push pull Topic
Durability
Topic Kafka 9 8 7 6 5 4 3 2 1 0 Producer Segment Files Consumer
Scalability
Kafka Broker 1 Partition 0.. 8 7 6 5 4 3 2 .. .. Broker 2 Partition 1.. 9 8 7 6 5 4 .. .. .. Producer Partitioning Consumer
Kafka Broker 1 Partition 0.. 8 7 6 5 4 3 2 .. .. Broker 2 Partition 1.. 9 8 7 6 5 4 .. .. .. Producer Partitioning Consumer Producer Producer
ConsumerKafka Broker 1 Partition 0.. 8 7 6 5 4 3 2 .. .. Broker 2 Partition 1.. 9 8 7 6 5 4 .. .. .. Producer Partitioning Producer Producer A1 A2 A3
Publish-subscribe
Kafka Broker 1 Partition 0.. 8 7 6 5 4 3 2 .. .. Broker 2 Partition 1.. 9 8 7 6 5 4 .. .. .. Producer Consumer Group A Consumer Groups A1 A2 Consumer Group B B1
Ordering
Consumer Group A Kafka Broker 1 Partition 0.. 14 12 10 8 6 4 2 .. .. Broker 2 Partition 1.. 13 11 9 7 5 3 .. .. .. Producer Partitioned by Key A1 A2
High Availability
Kafka Broker 1 Partition 0 (Leader) .. 8 7 6 5 4 3 2 .. .. Partition 1 (Follower).. 9 8 7 6 5 4 .. .. .. Broker 2 Partition 1 (Leader) .. 9 8 7 6 5 4 .. .. .. Partition 0 (Follower) .. 8 7 6 5 4 3 2 .. .. Producer Consumer Replication
Kafka Broker 1 Partition 0 (Leader) .. 8 7 6 5 4 3 2 .. .. Partition 1 (Follower).. 9 8 7 6 5 4 .. .. .. Broker 2 Partition 1 (Leader) .. 9 8 7 6 5 4 .. .. .. Partition 0 (Follower) .. 8 7 6 5 4 3 2 .. .. Producer Consumer Only Produce to / Consume from Leader
Kafka Broker 1 Partition 0 (Leader) .. 8 7 6 5 4 3 2 .. .. Partition 1 (Follower).. 9 8 7 6 5 4 .. .. .. Broker 2 Partition 1 (Leader) .. 9 8 7 6 5 4 .. .. .. Partition 0 (Follower) .. 8 7 6 5 4 3 2 .. .. Producer Consumer Follower Acts Like A Consumer
High Throughput ● Constant time suffices ● End-to-end batch compression ● Sequential access ● OS page cache ● Zero-copy
Sequential Access http://queue.acm.org/detail.cfm?id=1563874
https://www.slideshare.net/popcornylu/jcconf-apache-kafka
Zero-copy https://www.ibm.com/developerworks/linux/library/j-zerocopy/
Benchmark
Pulling & Offset Kafka Broker 1 Partition 0 (Leader) .. 8 7 6 5 4 3 2 .. .. Broker 2 Partition 1 (Leader) .. 9 8 7 6 5 4 .. .. .. Producer Consumer Group A A1 A2 Consumer Group B B1
Delivery Semantics ● At least once ● At most once ● Exactly once KafkaProducer Consumer
Delivery Semantics (Produce) ● At least once ○ Retry if no ACK ● At most once ○ Never retry ● Exactly once ○ Idempotence (new in 0.11) KafkaProducer
Delivery Semantics (Consume) ● At least once ○ Process messages, then commit offsets ● At most once ○ Commit offsets, then process messages ● Exactly once ○ At least once & idempotence Kafka Consumer
Producer val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) val f: java.util.concurrent.Future[RecordMetadata] = producer.send(new ProducerRecord[String, String]("my-topic", "key", "value")) f.get // sync producer.close
Consumer val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("group.id", "group1") // consumer group props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(Arrays.asList("my-topic")) while (true) { val records = consumer.poll(1000) // timeout for long pull for (record <- records.asScala) println(record.partition, record.offset, record.key, record.value) consumer.commitSync }
Describe Topic $ bin/kafka-topics.sh --describe --zookeeper "$zookeeper" --topic "test" Topic:test PartitionCount:8 ReplicationFactor:2 Configs:retention.ms=7200000 Topic: test Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4 Topic: test Partition: 3 Leader: 4 Replicas: 4,1 Isr: 1,4 Topic: test Partition: 4 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: test Partition: 5 Leader: 2 Replicas: 2,4 Isr: 2,4 Topic: test Partition: 6 Leader: 3 Replicas: 3,1 Isr: 1,3 Topic: test Partition: 7 Leader: 4 Replicas: 4,2 Isr: 4,2
Consumer Offset Checker $ bin/kafka-consumer-groups.sh --bootstrap-server "localhost:9092" --describe --group "consumer1" Group Topic Pid Offset logSize Lag consumer1 test 0 60057 80539 20482 consumer1 test 1 47632 66548 18916 consumer1 test 2 11099 30020 18921 consumer1 test 3 45640 65214 19574 consumer1 test 4 60408 61840 1432 consumer1 test 5 76674 96495 19821 consumer1 test 6 63305 82647 19342 consumer1 test 7 06678 25373 18695
https://github.com/sayuan/kafka-offset-exporter Monitoring
Cleanup Policy ● delete ● compact: retain the latest value for each key
Compaction
Old / New Consumer API ● Old API ○ --zookeeper localhost:9092 ○ Offsets stored in ZooKeeper ● New API ○ --bootstrap-server localhost:9092 ○ Offsets stored in Kafka’s internal topic
About Partitions ● Support increase the number of partitions only ● Configs (broker & topic) ○ retention.ms ○ retention.bytes ○ segment.bytes
Adding Brokers ● Reassigned partitions ○ Load balancing ○ Overhead ○ Throttle ● SiftScience/kafka-assigner ○ Minimized data movement ○ ekoontz/kafka-assigner
https://www.confluent.io/blog/hello-world-kafka-connect-kafka-streams/
Kafka Connect ● Problems ○ Schema management ○ Fault tolerance ○ Parallelism ○ Latency ○ Delivery semantics ○ Operations and monitoring ● A framework ● Connectors developed by Confluent ○ S3, Elasticsearch, HDFS, JDBC ● 19 certified connectors developed by vendors ● 67 others connectors listed on official site
Kafka Stream ● Problems ○ Partitioning & scalability ○ Semantics & fault tolerance ○ State ○ Windowing & time ○ Re-processing ● A client library
https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/
https://www.confluent.io/blog/chain-services-exactly-guarantees/
Thanks

Introduction to Apache Kafka