3

I am currently working in a project where we are utilizing kafka as a Message Queue. We have two use cases here, one is to consume the messages in parallel, no ordering of any kind required. And one is to have ordering based on partitioning key provided during producing the message.

In kafka, one partition key will always go to the same partition. Multiple partition keys might go to the same partition as well.

But, we do not want to block processing of a message with a different partition key which is present on the same partition as other messages with different partition keys.

With this context, we have written two implementations of our kafka consumer. One will process the messages in parallel, without worrying about the order, and one will process the messages sequentially per partition key.

Now, I have created an abstract class for kafka consumer, which does not know whether to process the messages in parallel or sequential of whatever strategy. It just has all the configurations that are required for a consumer to start, and it has 4 methods, poll, consume, commit and waitOptionally, that it calls and implements.

Now, this abstract class will have two implementations, one for processing the messages in parallel, where the implementation will override consume, commit and waitOptionally. One will be sequential processing per partition key, which will again override consume, commit and waitOptionally. poll remains same in both the implementations, and it's already there in abstract kafka consumer.

Approach 1

Pseudocode:

public abstract class AbstractKafkaConsumerRunnable<T> implements Runnable { private boolean isAutoCommit; private Map<String, List<ConsumerRecord<String, T>> partitionKeyVsconsumedRecords; abstract protected List<T> consume(List<T> records); protected List<T> poll() { .... } protected void commit(List<T> currentlyPolledRecords) { if (isAutoCommit) { return; } //logic to commit the records that are processed. //state maintained in consumedRecords } protected void waitOptionally() { .... } @Override public void run() { //poll //consume //commit //waitOptionally } } public class SequentialKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> { private Map<String, List<MessagingEvent>> recordsVsPartitionKey; @Override protected void consume(List<MessagingEvent> records) { //sequential consumption. //maintain state of record processing. //call user's consume method. } } public class ParallelKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> { @Override protected void consume(List<MessagingEvent> records) { //parallel consumption. //call user's consume method. } } 

In this approach, I'm defining boolean auto commit in my abstract class, and my implementation defines it. Sequential will return false and Parallel will return true. Based on this flag, I have written the logic in abstract kafka consumer class. So, commit implementation is not there in implementation classes, consume and waitOptionally is implemented by implementation classes.

Approach 2

Pseudocode:

public abstract class AbstractKafkaConsumerRunnable<T> implements Runnable { abstract protected void consume(List<T> records); abstract protected void commit(List<T> currentlyPolledRecords); abstract protected void waitOptionally(); protected List<T> poll() { .... } @Override public void run() { //poll //consume //commit //waitOptionally } } public class SequentialKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> { private Map<String, List<ConsumerRecord<String, T>> partitionKeyVsconsumedRecords; private Map<String, List<MessagingEvent>> recordsVsPartitionKey; @Override protected void consume(List<MessagingEvent> records) { //sequential consumption. //maintain state of record processing. //call user's consume method. } @Override protected void commit(List<T> currentlyPolledRecords) { //commit logic. //maintain state of commits. } @Override protected void waitOptionally() { //logic based on currently processing records. //Uses state as well. } } public class ParallelKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> { @Override protected void consume(List<MessagingEvent> records) { //parallel consumption. //call user's consume method. } @Override protected void waitOptionally() { .... } @Override protected void commit(List<T> currentlyPolledRecords) { return; } } 

In this approach, I've defined abstract methods consume, commit and waitOptionally and implemented them in both of my implementation classes.

Approach 3

Pseudocode:

public abstract class AbstractKafkaConsumerRunnable<T> implements Runnable { abstract protected List<T> consume(List<T> records); abstract protected List<T> commit(List<T> currentlyPolledRecords); abstract protected void waitOptionally(); protected List<T> poll() { .... } @Override public void run() { //poll //consume //commit //waitOptionally } } public class SequentialKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> { private KafkaMessageCommitTracker kafkaMessageCommitTracker; private KafkaMessageConsumptionStore kafkaMessageConsumptionStore; @Override protected void consume(List<MessagingEvent> records) { //add in kafkaMessageConsumptionStore. //call user's consume method. } @Override protected void commit(List<T> currentlyPolledRecords) { //call kafkaMessageCommitTracker.commit() } @Override protected void waitOptionally() { //logic based on currently processing records. //Uses state as well. } } public class ParallelKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> { @Override protected void consume(List<MessagingEvent> records) { //parallel consumption. //call user's consume method. } @Override protected void waitOptionally() { .... } @Override protected void commit(List<T> currentlyPolledRecords) { return; } } public interface KafkaMessageCommitTracker { void commit(); ...other state related functions like addRecord, pollRecord, isRecordPresent. } public interface KafkaMessageConsumptionStore { void addRecord(String partitionKey); void pollRecord(String partitionKey); ...other state related functions like isRecordPresent, totalNumberOfRecords. } 

This is similar to above approach, but I've further extracted out commit and consume logic in different classes (with abstractions) and made them as composition to my Sequential class. This is done with having 1 more level of granularity in mind with respect to SOLID's SRP, so that if commit logic changes, only commit class will change, and no other code needs to change with it. In this logic, only sequential class will be aware of these compositions, as only Sequential will be using it, and abstract and parallel class will have no knowledge of these abstractions.

Approach 4

In this approach, I don't have any abstractions. Just 1 KafkaConsumer class which takes strategies as it's constructor parameters. pollStrategy, commitStrategy, consumeStrategy, waitOptionallyStrategy. And my clients can pass in whatever strategies they want to plug and play with. All the strategies will be an interface with multiple implementations and any implementation can be passed in by the client.

Do keep in mind that some of the consume and commit strategies will have to maintain in memory state as well, which I'm keeping in the strategy class only, not exposing it to the outside world.

I'm confused on how to decide which approach should I take for my context. This is an enterprise application and although kafka is used a lot, but the frequency of change in it's implementation is EXTREMELY rare.

Approach 1 is depending on a boolean flag to implement logic, which I feel like is not extendable IF other use cases pop up, although again EXTREMELY rare.

Approach 2 is good, but the Sequential class ends up with too much logic, consume has it's own logic and state, commit has it's own logic and state, all in one class.

Approach 3 is what I'm aligned with, where I have defined the implementations, and further more extracted out the commit and consume logic in different classes with abstractions. I feel like that is just the right amount of granularity I can live with.

Approach 4 is too granular, where I'm making my clients understand what strategy does what, where else most of the time the clients just want to consume the messages, not worrying about the underlying logic.

1 Answer 1

5

This is a convoluted question

It is possible this answer misses the mark. I cannot singlehandedly distinguish whether your question is convoluted and conflates unrelated things; or whether I have misunderstood the premise of the question. My intuition is telling me the former, but I'm obviously biased towards my own understanding of things.

Honestly, I find your entire Kafka explanation confusing, in the sense that it obfuscates what (according to my interpretation of your question both in the title and the gist of your post) should be a more straightforward discussion on SRP. I get the feeling that you're conflating things that don't need to be considered in the same breath. When push comes to shove, this answer will err towards talking about SRP, not Kafka shenanigans, because it is my interpretation that you're asking to learn about SRP and the Kafka example is just an example, albeit a needlessly distracting one.

I'm no Java dev, but your code contains generic types in classes which are not generic (approach 2 has a class SequentialKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> with a Map<String, List<ConsumerRecord<String, T>> property - what is T?), which makes me think you didn't really vet this code with a compiler to see if it even works right. The issue here for me as an answerer is that your semantical question is convoluted, and your syntax (which often helps clarify things) is inconsistent, making it very hard to reveal what your core goal here is.

Therefore, this is an answer on SRP, not on how to process Kafka messages in Java.


The point of SRP

First things first, the SRP was coined for cases where blatantly different things were being done, e.g. processing and logging. It wasn't really designed with very niche distinction for particular domains (such as message queue parallelism) as the main focal point. The deeper you dig into the splitting hairs, which IMO you're getting close to if not there already, the harder it gets to define responsibility boundaries, and that complexity scales exponentially.

It is a very common complaint about SRP that it is decidedly vague as to what constitutes a responsibility. You could read all there is to read about SRP and different people can come out of it with very different granularities/scopes as to what they consider a responsibility to be. For instance, you seem to be defining a responsibility as handling the entire logical flow of ingesting Kafka messages. The thing is, it is a subjective granularity, because context matters (as it always does).

Let's use the example of calculating the hypotenuse of a right angle triangle (i.e. the Pythagorean theorem). Initially you'll think of creating a HypotenuseCalculator which received the lengths of the two other sides and outputs the hypotenuse. But when you think about it, that calculation is made up out of different mathematical operation: square, addition, square root. Should these be separate responsibilities?

The answer is that context matters.

If you're building the next Wolfram Alpha (if you don't know it, it's the love child of Google and mathematical calculators), yeah you're going to want to define distinct mathematical operations because you're going to be composing them in myriad different ways and you want to promote reusability and the ability to optimize one particular calculation without needing to worry about impacting anything else that's unrelated.

If you're building an application where the only non-trivial mathematical operation you'll ever perform is that hypotenuse calculation, then no you don't need to subdivide it any further.

A responsibility, at its core, is defined as the minimal granularity that is actually relevant for your scenario. Relevance is fairly vague, but I find that in a lot of cases it can be attributed to things like reusability, swappability, or a avoiding what would otherwise be a completely heterogenous mix of two algorithms that have no bearing on one another in any way.


Unrelated concerns

With the above in mind, let's start reducing your scenario to the parts that actually matter for SRP. First, trimming some fat:

Approach 1 is depending on a boolean flag to implement logic, which I feel like is not extendable IF other use cases pop up, although again EXTREMELY rare.

Boolean flags are a code smell, but not one inherently related to SRP and possible SRP violations. That's a separate topic.

Approach 4 is too granular, where I'm making my clients understand what strategy does what, where else most of the time the clients just want to consume the messages, not worrying about the underlying logic.

Thats an issue of encapsulation (or lack thereof), not SRP.


The SRP parts of your question

The main gripe I have with this question is that it tries to apply SRP to a consideration on how to design a reusable contract for competing implementations of what should instead be considered a singular responsibility: emitting messages that need to be processed in a way that the consumer does not need to care about the order in which it is asked to process these messages. And that's why this question is so convoluted.

One thing that is very relevant to learning to apply SRP is found in something you didn't mention in your answer: how a given message gets processed. Not how you decide which messages to process when, but what "processing" the message actually means. This explanation is nowhere to be found in your question.

Ironically, this implies that you actually already understand SRP the way it is meant to be understood, because your omission of the processing logic shows that how to process a message is a separate responsibility from deciding which message should be processed, which is why you didn't add it into the question's context. That is the separation of responsibilities that SRP cares about. Effectively, the important thing for SRP is that you ensure that these two responsibilities don't mix in a single class.

How you design a singular responsibility (the message selection process) in a way that it can be implemented in multiple ways, so that another responsibility (the message processing logic) can work with any of them, is unrelated to SRP.


Parting advice

I can't solve your problems because of the conflated/convoluted nature of the question. But I do want to give you a tip on how to tackle the problem you're actually trying to solve.

I find that it often helps to not write implementations, only interfaces, when you are designing the interaction between two components/responsibilities in a way that this interaction should not be influenced by which implementation of a given interface you're using.

Forget what your differnt processing strategies are. If need be, physically remove them from your codebase. Just think about a generalized processing strategy. What would that interface need to look like? How would the consumer (i.e. the message processor) interact with it?

And then when that job is finished, forget all about what you did there; and start thinking only about how to write the first implementation of that interface. Then forget about that, and start thinking only about what the second implementation would look like. Lastly, revisit both your implementations and check if you invented the wheel twice, and if you can extract something meaningfully reusable from it (if so, abstract it away into a separate component).

But your design on how the consumer should interact with a strategy should be designed and locked in1 before you start thinking about the specific implementations.


1 Be sensible about what I said. I'm not saying you're not allowed to fix a mistake you end up making in that interface design when you realize that down the line. What I'm saying is that the design should largely be finished before you start thinking about implementations and you should not let your design of the interface be influenced by your ideas on the implementation.
Doing those two things separately (one after the other) isn't a technical requirement, it's a tip to help keep you focused on one job when you find yourself conflating things if you try to do them at the same time.

1
  • Thank you for a very detailed answer. I agree that I was not able to articulate my question better. Kafka was not the main point in any way for my question, the main point was to get an idea on whether I should separate out different functionalities related to Kafka in different components, or should keep them at a single place. I tried to provide context to the question as well, as the main point of my question varies according to the context, but I guess I failed there. Commented Feb 24 at 7:11

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.