0

I am planning a Spring+Kafka Streams application that handles incoming messages and stores updated internal state as a result of these messages. This state is predicted to reach ~500mb per unique key (There are likely to be ~10k unique keys distributed across 2k partitions).

This state must generally be held in-memory for effective operation of my application but even on disk I would still face a similar problem (albeit just at a later date of scaling).

I am planning to deploy this application into a dynamically scaling environment such as AWS and will set a minimum number of instances, but I am wary of 2 situations:

  • On first startup (where perhaps just 1 consumer starts first) it will not be able to handle taking assignment of all the partitions because the in memory state will overflow the instances available memory.
  • After a major outtage (AWS availability zone outtage) it could be that 33% of consumers are taken out of the group and the additional memory load on the remaining instances could actually take out everyone who remains.

How do people protect their consumers from taking on more partitions than they can handle such that they do not overflow available memory/disk?

1 Answer 1

2

See the kafka documentation.

Since 0.11...

enter image description here

EDIT

For your second use case (and it also works for the first), perhaps you could implement a custom PartitionAssignor that limits the number of partitions assigned to each instance.

I haven't tried it; I don't know how the broker will react to the presence of unassigned partitions.

EDIT2

This seems to work ok; but YMMV...

public class NoMoreThanFiveAssignor extends RoundRobinAssignor { @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic, subscriptions); assignments.forEach((memberId, assigned) -> { if (assigned.size() > 5) { System.out.println("Reducing assignments from " + assigned.size() + " to 5 for " + memberId); assignments.put(memberId, assigned.stream() .limit(5) .collect(Collectors.toList())); } }); return assignments; } } 

and

@SpringBootApplication public class So54072362Application { public static void main(String[] args) { SpringApplication.run(So54072362Application.class, args); } @Bean public NewTopic topic() { return new NewTopic("so54072362", 15, (short) 1); } @KafkaListener(id = "so54072362", topics = "so54072362") public void listen(ConsumerRecord<?, ?> record) { System.out.println(record); } @Bean public ApplicationRunner runner(KafkaTemplate<String, String> template) { return args -> { for (int i = 0; i < 15; i++) { template.send("so54072362", i, "foo", "bar"); } }; } } 

and

spring.kafka.consumer.properties.partition.assignment.strategy=com.example.NoMoreThanFiveAssignor spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=earliest 

and

Reducing assignments from 15 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a 2019-01-07 15:24:28.288 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 7 2019-01-07 15:24:28.289 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4] 2019-01-07 15:24:28.296 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4] 2019-01-07 15:24:46.303 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing 2019-01-07 15:24:46.303 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4] 2019-01-07 15:24:46.303 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4] 2019-01-07 15:24:46.304 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group Reducing assignments from 8 to 5 for consumer-2-c9a6928a-520c-4646-9dd9-4da14636744b Reducing assignments from 7 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a 2019-01-07 15:24:46.310 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 8 2019-01-07 15:24:46.311 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3] 2019-01-07 15:24:46.315 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3] 2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing 2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3] 2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3] 2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group 2019-01-07 15:24:58.330 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 9 2019-01-07 15:24:58.332 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2] 2019-01-07 15:24:58.336 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2] 

Of course, this leaves the unassigned partitions dangling, but it sounds like that's what you want, until the region comes back online.

Sign up to request clarification or add additional context in comments.

8 Comments

Thanks kindly for the answer Gary. This would be a belt and braces fix approach requiring us to put some effort into capacity planning for the future but it's not impossible to do and I like the simplicity. Another similar scenario I have recently become aware of is one of 'mass failure' e.g. the same situation occurs if an AWS availability zone would be to fail and take out 1/3 of your consumers. Do you have any thoughts as to what to do in that situation too?
I have revised the initial question to make clear the 2 situations in which 'memory protection' might be needed. I'll mark yours as accepted answer if no-one suggests anything reasonable for scenario 2 in a decent frame of time :)
The only thought that comes to my mind would be to manually assign partitions rather than using group management; but that pretty much eliminates dynamic scaling (and I am not even sure if it's even possible with streams).
I had another idea; I don't know how practical it is, but it seems to work; see the edits to my answer.
Your assignor suggestion sounds plausible - albeit this is a Kafka Streams application and Spring has already implemented an Assignor (That presumably handles the logic of making sure you assign the same partitions across multiple topics, stores etc) and whenever i've tried to change this I run into some pretty painful issues. Have you had any experience overriding the assignor in a kafka streams spring app?
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.