22

I am using KafkaConsumer 0.10 Java api. I want to consume from a specific partition and specific offset. I looked up and found that there is a seek method but its throwing an exception. Anyone had a similar use case or solution ?

Code:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); consumer.seek(new TopicPartition("mytopic", 1), 4); 

Exception

java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) at xx.xxx.xxx.Test.main(Test.java:182) 

3 Answers 3

54

Before you can seek() you first need to subscribe() to a topic or assign() partition of a topic to the consumer. Also keep in mind, that subscribe() and assign() are lazy -- thus, you also need to do a "dummy call" to poll() before you can use seek().

Note: as of Kafka 2.0, the new poll(Duration timeout) is async and it's not guaranteed that you have a complete assignment when poll returns. Thus, you might need to check your assignment before using seek() and also poll again to refresh the assignment. (Cf. KIP-266 for details)

If you use subscribe(), you use group management: thus, you can start multiple consumers using the same group.id and all partitions of the topic will be assigned evenly over all consumers within the group automatically (each partition will get assigned to a single consumer in the group).

If you want to read specific partitions, you need to use manual assignment via assign(). This allows you to do any assignment you want.

Btw: KafkaConsumer has a very long an detailed class JavaDoc including examples. It's worth to read it.

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks. It worked :) with the combination of assign() and seek()
i think you mean group.id instead of application.id
Answering too many #KafkaStream questions here... Thanks for pointing out @automaticgiant
@MatthiasJ.Sax "Also keep in mind, that subscribe() and assign() are lazy -- thus, you also need to do a "dummy call" to poll() before you can use seek()." How the lazy type of subscribe and assign affects the the code? What if I don't use dummy poll like poll(0)?
I guess for assign() it's ok. But for subscribe() as long as you don't call poll() you did not join the consumer group and you don't have any partitions assigned. Thus, you will end up with an exception as shown in the question.
1

If you do not want to use poll() and retrieve map records, and change the offset itself. Kafka version 0.11 Try this:

... props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2")); List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList()); Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); coordinatorField.setAccessible(true); ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer); coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings consumer.seekToBeginning(partitions); //or other seek 

Poll for coordinator events. This ensures that the coordinator is known and that the consumer has joined the group (if it is using group management). This also handles periodic offset commits if they are enabled.

Comments

0

Please use consumer.assign with consumer.seek and not consumer.subscribe

After these changes, it will execute fine.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.