Intra-cluster Replication for Apache Kafka Jun Rao
About myself • Engineer at LinkedIn since 2010 • Worked on Apache Kafka and Cassandra • Database researcher at IBM
Outline • Overview of Kafka • Kafka architecture • Kafka replication design • Performance • Q/A
What’s Kafka • A distributed pub/sub messaging system • Used in many places – LinkedIn, Twitter, Box, FourSquare … • What do people use it for? – log aggregation – real-time event processing – monitoring – queuing
Example Kafka Apps at LinkedIn
Kafka Deployment at LinkedIn Live data center Offline data center Live Live Live service service service interactive data (human, machine) Monitorin g Kafka Kafka Kafka Hadoop Hadoop Kafka Kafka Kafka Hadoop Per day stats • writes: 10+ billion messages (2+TB compressed data) • reads: 50+ billion messages
Kafka vs. Other Messaging Systems • Scale-out from groundup • Persistence to disks • High throughput (10s MB/sec per server) • Multi-subscription
Outline • Overview of Kafka • Kafka architecture • Kafka replication design • Performance • Q/A
Kafka Architecture Producer Producer Broker Broker Zookeeper Broker Broker Consumer Consumer
Terminologies • Topic = message stream • Topic has partitions – partitions distributed to brokers • Partition has a log on disk – message persisted in log – message addressed by offset
API • Producer messages = new List<KeyedMessage<K,V>>(); messages.add(newKeyedMessage(“topic1”, null, “msg1”); send(messages); • Consumer streams[] = Consumer.createMessageStream(“topic1”, 1); for(message: streams[0]) { // do something with message }
Deliver High Throughput • Simple storage logs in broker msg-1 msg-2 topic1:part1 topic2:part1 msg-3 msg-4 index segment-1 segment-1 msg-5 … … segment-2 segment-2 msg-n read() segment-n segment-n append() • Batched writes and reads • Zero-copy transfer from file to socket • Compression (batched)
Outline • Overview of Kafka • Kafka architecture • Kafka replication design • Performance • Q/A
Why Replication • Broker can go down – controlled: rolling restart for code/config push – uncontrolled: isolated broker failure • If broker down – some partitions unavailable – could be permanent data loss • Replication  higher availability and durability
CAP Theorem • Pick two from – consistency – availability – network partitioning
Kafka Replication: Pick CA • Brokers within a datacenter – i.e., network partitioning is rare • Strong consistency – replicas byte-wise identical • Highly available – typical failover time: < 10ms
Replicas and Layout • Partition has replicas • Replicas spread evenly among brokers logs logs logs logs topic1-part1 topic1-part2 topic2-part1 topic2-part2 topic2-part2 topic1-part1 topic1-part2 topic2-part1 topic2-part1 topic2-part2 topic1-part1 topic1-part2 broker 1 broker 2 broker 3 broker 4
Maintain Strongly Consistent Replicas • One of the replicas is leader • All writes go to leader • Leader propagates writes to followers in order • Leader decides when to commit message
Conventional Quorum-based Commit • Wait for majority of replicas (e.g. Zookeeper) • Plus: good latency • Minus: 2f+1 replicas  tolerate f failures – ideally want to tolerate 2f failures
Commit Messages in Kafka • Leader maintains in-sync-replicas (ISR) – initially, all replicas in ISR – message committed if received by ISR – follower fails  dropped from ISR – leader commits using new ISR • Benefit: f replicas  tolerate f-1 failures – latency less an issue within datacenter
Data Flow in Replication producer 2 ack 1 2 leader follower follower 3 commit 4 topic1-part1 topic1-part1 topic1-part1 consumer broker 1 broker 2 broker 3 When producer receives ack Latency Durabilityon failures no ack no network delay some data loss wait for leader 1 network roundtrip a few data loss wait for committed 2 network roundtrips no data loss Only committed messages exposed to consumers • independent of ack type chosen by producer
Extend to Multiple Partitions producer leader follower follower topic1-part1 topic1-part1 topic1-part1 producer leader follower follower producer topic2-part1 topic2-part1 topic2-part1 follower follower leader topic3-part1 topic3-part1 topic3-part1 broker 1 broker 2 broker 3 broker 4 • Leaders are evenly spread among brokers
Handling Follower Failures • Leader maintains last committed offset – propagated to followers – checkpointed to disk • When follower restarts – truncate log to last committed – fetch data from leader – fully caught up  added to ISR
Handling Leader Failure • Use an embedded controller (inspired by Helix) – detect broker failure via Zookeeper – on leader failure: elect new leader from ISR – committed messages not lost • Leader and ISR written to Zookeeper – for controller failover – expected to change infrequently
Example of Replica Recovery 1. ISR = {A,B,C}; Leader A commits message m1; L (A) F (B) F (C) m1 m1 m1 last committed m2 m2 m3 2. A fails and B is new leader; ISR = {B,C}; B commits m2, but not m3 L (A) L (B) F (C) m1 m1 m1 m2 m2 m2 m3 3. B commits new messages m4, m5 L (A) L (B) F (C) m1 m1 m1 m2 m2 m2 m3 m4 m4 m5 m5 4. A comes back, truncates to m1 and catches up; finally ISR = {A,B,C} F (A) L (B) F (C) F (A) L (B) F (C) m1 m1 m1 m1 m1 m1 m2 m2 m2 m2 m2 m4 m4 m4 m4 m4 m5 m5 m5 m5 m5
Outline • Overview of Kafka • Kafka architecture • Kafka replication design • Performance • Q/A
Setup • 3 brokers • 1 topic with 1 partition • Replication factor=3 • Message size = 1KB
Choosing btw Latency and Durability When producer Time to publish Durabilityon receives ack a message (ms) failures no ack 0.29 some data loss wait for leader 1.05 a few data loss wait for committed 2.05 no data loss
Producer Throughput varying messages per send varying # concurrent producers 70 70 60 60 50 50 MB/s MB/s 40 40 no ack no ack 30 30 20 leader 20 leader 10 committed 10 committed 0 0 1 10 100 1000 1 5 10 20 messages per send # producers
Consumer Throughput throughput vs fetch size 100 80 60 MB/s 40 20 0 1KB 10KB 100KB 1MB fetch size
Q/A • Kafka 0.8.0 (intra-cluster replication) – expected to be released in Mar – various performance improvements in the future • Checkout more about Kafka – http://kafka.apache.org/ • Kafka meetup tonight

Kafka replication apachecon_2013

  • 1.
    Intra-cluster Replication for Apache Kafka Jun Rao
  • 2.
    About myself • Engineerat LinkedIn since 2010 • Worked on Apache Kafka and Cassandra • Database researcher at IBM
  • 3.
    Outline • Overview of Kafka • Kafka architecture • Kafka replication design • Performance • Q/A
  • 4.
    What’s Kafka • Adistributed pub/sub messaging system • Used in many places – LinkedIn, Twitter, Box, FourSquare … • What do people use it for? – log aggregation – real-time event processing – monitoring – queuing
  • 5.
    Example Kafka Appsat LinkedIn
  • 6.
    Kafka Deployment atLinkedIn Live data center Offline data center Live Live Live service service service interactive data (human, machine) Monitorin g Kafka Kafka Kafka Hadoop Hadoop Kafka Kafka Kafka Hadoop Per day stats • writes: 10+ billion messages (2+TB compressed data) • reads: 50+ billion messages
  • 7.
    Kafka vs. OtherMessaging Systems • Scale-out from groundup • Persistence to disks • High throughput (10s MB/sec per server) • Multi-subscription
  • 8.
    Outline • Overview of Kafka • Kafka architecture • Kafka replication design • Performance • Q/A
  • 9.
    Kafka Architecture Producer Producer Broker Broker Zookeeper Broker Broker Consumer Consumer
  • 10.
    Terminologies • Topic =message stream • Topic has partitions – partitions distributed to brokers • Partition has a log on disk – message persisted in log – message addressed by offset
  • 11.
    API • Producer messages = new List<KeyedMessage<K,V>>(); messages.add(newKeyedMessage(“topic1”, null, “msg1”); send(messages); • Consumer streams[] = Consumer.createMessageStream(“topic1”, 1); for(message: streams[0]) { // do something with message }
  • 12.
    Deliver High Throughput •Simple storage logs in broker msg-1 msg-2 topic1:part1 topic2:part1 msg-3 msg-4 index segment-1 segment-1 msg-5 … … segment-2 segment-2 msg-n read() segment-n segment-n append() • Batched writes and reads • Zero-copy transfer from file to socket • Compression (batched)
  • 13.
    Outline • Overview of Kafka • Kafka architecture • Kafka replication design • Performance • Q/A
  • 14.
    Why Replication • Brokercan go down – controlled: rolling restart for code/config push – uncontrolled: isolated broker failure • If broker down – some partitions unavailable – could be permanent data loss • Replication  higher availability and durability
  • 15.
    CAP Theorem • Picktwo from – consistency – availability – network partitioning
  • 16.
    Kafka Replication: PickCA • Brokers within a datacenter – i.e., network partitioning is rare • Strong consistency – replicas byte-wise identical • Highly available – typical failover time: < 10ms
  • 17.
    Replicas and Layout •Partition has replicas • Replicas spread evenly among brokers logs logs logs logs topic1-part1 topic1-part2 topic2-part1 topic2-part2 topic2-part2 topic1-part1 topic1-part2 topic2-part1 topic2-part1 topic2-part2 topic1-part1 topic1-part2 broker 1 broker 2 broker 3 broker 4
  • 18.
    Maintain Strongly ConsistentReplicas • One of the replicas is leader • All writes go to leader • Leader propagates writes to followers in order • Leader decides when to commit message
  • 19.
    Conventional Quorum-based Commit •Wait for majority of replicas (e.g. Zookeeper) • Plus: good latency • Minus: 2f+1 replicas  tolerate f failures – ideally want to tolerate 2f failures
  • 20.
    Commit Messages inKafka • Leader maintains in-sync-replicas (ISR) – initially, all replicas in ISR – message committed if received by ISR – follower fails  dropped from ISR – leader commits using new ISR • Benefit: f replicas  tolerate f-1 failures – latency less an issue within datacenter
  • 21.
    Data Flow inReplication producer 2 ack 1 2 leader follower follower 3 commit 4 topic1-part1 topic1-part1 topic1-part1 consumer broker 1 broker 2 broker 3 When producer receives ack Latency Durabilityon failures no ack no network delay some data loss wait for leader 1 network roundtrip a few data loss wait for committed 2 network roundtrips no data loss Only committed messages exposed to consumers • independent of ack type chosen by producer
  • 22.
    Extend to MultiplePartitions producer leader follower follower topic1-part1 topic1-part1 topic1-part1 producer leader follower follower producer topic2-part1 topic2-part1 topic2-part1 follower follower leader topic3-part1 topic3-part1 topic3-part1 broker 1 broker 2 broker 3 broker 4 • Leaders are evenly spread among brokers
  • 23.
    Handling Follower Failures •Leader maintains last committed offset – propagated to followers – checkpointed to disk • When follower restarts – truncate log to last committed – fetch data from leader – fully caught up  added to ISR
  • 24.
    Handling Leader Failure •Use an embedded controller (inspired by Helix) – detect broker failure via Zookeeper – on leader failure: elect new leader from ISR – committed messages not lost • Leader and ISR written to Zookeeper – for controller failover – expected to change infrequently
  • 25.
    Example of ReplicaRecovery 1. ISR = {A,B,C}; Leader A commits message m1; L (A) F (B) F (C) m1 m1 m1 last committed m2 m2 m3 2. A fails and B is new leader; ISR = {B,C}; B commits m2, but not m3 L (A) L (B) F (C) m1 m1 m1 m2 m2 m2 m3 3. B commits new messages m4, m5 L (A) L (B) F (C) m1 m1 m1 m2 m2 m2 m3 m4 m4 m5 m5 4. A comes back, truncates to m1 and catches up; finally ISR = {A,B,C} F (A) L (B) F (C) F (A) L (B) F (C) m1 m1 m1 m1 m1 m1 m2 m2 m2 m2 m2 m4 m4 m4 m4 m4 m5 m5 m5 m5 m5
  • 26.
    Outline • Overview of Kafka • Kafka architecture • Kafka replication design • Performance • Q/A
  • 27.
    Setup • 3 brokers • 1 topic with 1 partition • Replication factor=3 • Message size = 1KB
  • 28.
    Choosing btw Latencyand Durability When producer Time to publish Durabilityon receives ack a message (ms) failures no ack 0.29 some data loss wait for leader 1.05 a few data loss wait for committed 2.05 no data loss
  • 29.
    Producer Throughput varying messages per send varying # concurrent producers 70 70 60 60 50 50 MB/s MB/s 40 40 no ack no ack 30 30 20 leader 20 leader 10 committed 10 committed 0 0 1 10 100 1000 1 5 10 20 messages per send # producers
  • 30.
    Consumer Throughput throughput vs fetch size 100 80 60 MB/s 40 20 0 1KB 10KB 100KB 1MB fetch size
  • 31.
    Q/A • Kafka 0.8.0(intra-cluster replication) – expected to be released in Mar – various performance improvements in the future • Checkout more about Kafka – http://kafka.apache.org/ • Kafka meetup tonight