0

Defined a custom store, for usage in custom Transformer (reference below).

https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

public class KafkaStream { public static void main(String[] args) { StateStoreSupplier houseStore = Stores.create("HOUSE").withKeys(Serdes.String()).withValues(houseSerde).persistent().build(); KStreamBuilder kstreamBuilder = new KStreamBuilder(); kstreamBuilder.addStateStore(houseStore); . . . KStream<String, String> testStream = kstreamBuilder.stream(Serdes.String(), Serdes.String(), "test"); testStream.transform(HourlyDetail::new, houseStore.name()); . . . } } class HouseDetail implements Transformer<String, String, KeyValue<String, House>> { @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { this.usageStore = (KeyValueStore<String, House>) context.getStateStore("HOUSE"); } . . . } 


I get the following exception. Not sure, why the internal topic "test_01-HOUSE-changelog" being created with a single partition and single replication, as opposed to the 2 partitions in the source partition "test". What am missing here?

[2018-05-14 23:38:09,391] ERROR stream-thread [StreamThread-1] Failed to create an active task 0_1: (org.apache.kafka.streams.processor.internals.StreamThread:666) org.apache.kafka.streams.errors.StreamsException: task [0_1] Store HOUSE's change log (test_01-HOUSE-changelog) does not contain partition 1 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:169) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 


$ ./kafka-topics.sh --zookeeper localhost:2181 --topic test --describe Topic:test PartitionCount:2 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 1001 Replicas: 1001,1002,1003 Isr: 1002,1001,1003 Topic: test Partition: 1 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1001,1003 $ ./kafka-topics.sh --zookeeper localhost:2181 --topic test_01-HOUSE-changelog --describe Topic:test_01-HOUSE-changelog PartitionCount:1 ReplicationFactor:1 Configs: Topic: test_01-HOUSE-changelog Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001 

Exception after disabling the auto-topic creation

[2018-05-17 14:25:41,114] ERROR stream-thread [StreamThread-1] Failed to create an active task 0_0: (org.apache.kafka.streams.processor.internals.StreamThread:666) org.apache.kafka.streams.errors.StreamsException: task [0_0] Could not find partition info for topic: test_01-HOUSE-changelog at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:169) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 

1 Answer 1

1

If the topic exist with one partition, Kafka Streams will not automatically change the number of partitions. It's unclear why the topic was created with one partitions from the information you provide. One possibility would be, that your input topic had one partitions when you started your app the first time and you later added a second partitions to the input topic.

You need to clean up the application using the application reset tool as described in the docs (note, it's a two step process): https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html

Sign up to request clarification or add additional context in comments.

7 Comments

The input topic was created with 2 partition originally. However, I removed the internal topic "test_01-HOUSE-changelog" topic and made sure that it doesn't appear when listing the topics again. The input topic has 2 partitions per the listing above, I re-ran the stream app, but it again created the topic with single partition. Any other part that could help to determine why single partition is getting created?
Creating the internal topic manually makes the stream app run, but as the naming convention can change in feature, that would not be an option
Hmmm... That's weird behavior. You might want to set a break point in InternalTopicManger#makeReady() method to get more insight. Another thought is about auto-topic creation -- is it enabled broker side (if yes, maybe the broker create the topic incorrectly via this mechanism -- for this case, you should disable auto topic create -- disabling it is recommended anyway)
The auto topic creation was enabled, however disabling that causes no topics (internal) to be created by the stream process.
Hmmm... If the topic cannot be created, the application should fail with an error -- did you check the logs?
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.