12

I'm writing a basic application to test the Interactive Queries feature of Kafka Streams. Here is the code:

public static void main(String[] args) { StreamsBuilder builder = new StreamsBuilder(); KeyValueBytesStoreSupplier waypointsStoreSupplier = Stores.persistentKeyValueStore("test-store"); StoreBuilder waypointsStoreBuilder = Stores.keyValueStoreBuilder(waypointsStoreSupplier, Serdes.ByteArray(), Serdes.Integer()); final KStream<byte[], byte[]> waypointsStream = builder.stream("sample1"); final KStream<byte[], TruckDriverWaypoint> waypointsDeserialized = waypointsStream .mapValues(CustomSerdes::deserializeTruckDriverWaypoint) .filter((k,v) -> v.isPresent()) .mapValues(Optional::get); waypointsDeserialized.groupByKey().aggregate( () -> 1, (aggKey, newWaypoint, aggValue) -> { aggValue = aggValue + 1; return aggValue; }, Materialized.<byte[], Integer, KeyValueStore<Bytes, byte[]>>as("test-store").withKeySerde(Serdes.ByteArray()).withValueSerde(Serdes.Integer()) ); final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(createStreamsProperties())); streams.cleanUp(); streams.start(); ReadOnlyKeyValueStore<byte[], Integer> keyValueStore = streams.store("test-store", QueryableStoreTypes.keyValueStore()); KeyValueIterator<byte[], Integer> range = keyValueStore.all(); while (range.hasNext()) { KeyValue<byte[], Integer> next = range.next(); System.out.println(next.value); } Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } protected static Properties createStreamsProperties() { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "random167"); streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Integer().getClass().getName()); //streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); return streamsConfiguration; } 

So my problem is, every time I run this I get this same error:

Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, test-store, may have migrated to another instance.

I'm running only 1 instance of the application, and the topic I'm consuming from has only 1 partition.

Any idea what I'm doing wrong ?

5
  • The exception message is quite generic. It's a know issue: issues.apache.org/jira/browse/KAFKA-5876 -- you should monitor the state of your application -- stores can only be queried is the app is RUNNING: docs.confluent.io/current/streams/… Commented Mar 15, 2018 at 18:14
  • @MatthiasJ.Sax, I did indeed monitor the state of the KafaStreams and made sure querying the store only happens when the state is RUNNING. I still get the same error. Commented Mar 15, 2018 at 21:36
  • Not sure atm. As @kyle mentioned in his answer, in general you need to be aware that a store might be unavailable at any point in time, and thus, you need to retry. Also note, on startup, KafkaStreams does a CRAETED -> RUNNING -> REBALANCING -> RUNNING transition -- thus, if you try to query after the first rebalance, you most likely get this exception as a REBALANCE happens immediately. Commented Mar 15, 2018 at 22:22
  • did you ever resolve this? having the same issue, but only with streams that are subscribing to multiple confluent connect sources. despite the retry logic and the fact that the State = RUNNING, for these streams, InvalidStateStoreException is always thrown when restarting the streams app. if i wipe the offsets and stores, then start the app, it works fine. Commented Apr 24, 2018 at 14:10
  • @mike01010 no i didn't resolve it. The exception thrown is not clear enough to know the root of the issue. Commented Apr 25, 2018 at 13:50

4 Answers 4

12

Looks like you have a race condition. From the kafka streams javadoc for KafkaStreams::start() it says:

Start the KafkaStreams instance by starting all its threads. This function is expected to be called only once during the life cycle of the client. Because threads are started in the background, this method does not block.

https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html

You're calling streams.store() immediately after streams.start(), but I'd wager that you're in a state where it hasn't initialized fully yet.

Since this is code appears to be just for testing, add a Thread.sleep(5000) or something in there and give it a go. (This is not a solution for production) Depending on your input rate into the topic, that'll probably give a bit of time for the store to start filling up with events so that your KeyValueIterator actually has something to process/print.

Sign up to request clarification or add additional context in comments.

2 Comments

So what do you recommend to do in production?
Not an exhaustive list, but: a) separate the store client from the server, and b) add robust error handling into the client to handle all cases where the client can't connect to the kafka streams server for whatever reason. How your app handles those cases? Up to you!
4

Probably not applicable to OP but might help others:

In trying to retrieve a KTable's store, make sure the the KTable's topic exists first or you'll get this exception.

2 Comments

How can I check to see if the topic exists?
@emirhosseini please check the following question's answer
1

Typically this happens for two reasons:

The local KafkaStreams instance is not yet ready (i.e., not yet in runtime state RUNNING, see Run-time Status Information) and thus its local state stores cannot be queried yet. The local KafkaStreams instance is ready (e.g. in runtime state RUNNING), but the particular state store was just migrated to another instance behind the scenes. This may notably happen during the startup phase of a distributed application or when you are adding/removing application instances.

https://docs.confluent.io/platform/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

The simplest approach is to guard against InvalidStateStoreException when calling KafkaStreams#store():

// Example: Wait until the store of type T is queryable. When it is, return a reference to the store. public static <T> T waitUntilStoreIsQueryable(final String storeName, final QueryableStoreType<T> queryableStoreType, final KafkaStreams streams) throws InterruptedException { while (true) { try { return streams.store(storeName, queryableStoreType); } catch (InvalidStateStoreException ignored) { // store not yet ready for querying Thread.sleep(100); } } } 

Comments

0

I failed to call Storebuilder before consuming the store.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.