I'm writing a basic application to test the Interactive Queries feature of Kafka Streams. Here is the code:
public static void main(String[] args) { StreamsBuilder builder = new StreamsBuilder(); KeyValueBytesStoreSupplier waypointsStoreSupplier = Stores.persistentKeyValueStore("test-store"); StoreBuilder waypointsStoreBuilder = Stores.keyValueStoreBuilder(waypointsStoreSupplier, Serdes.ByteArray(), Serdes.Integer()); final KStream<byte[], byte[]> waypointsStream = builder.stream("sample1"); final KStream<byte[], TruckDriverWaypoint> waypointsDeserialized = waypointsStream .mapValues(CustomSerdes::deserializeTruckDriverWaypoint) .filter((k,v) -> v.isPresent()) .mapValues(Optional::get); waypointsDeserialized.groupByKey().aggregate( () -> 1, (aggKey, newWaypoint, aggValue) -> { aggValue = aggValue + 1; return aggValue; }, Materialized.<byte[], Integer, KeyValueStore<Bytes, byte[]>>as("test-store").withKeySerde(Serdes.ByteArray()).withValueSerde(Serdes.Integer()) ); final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(createStreamsProperties())); streams.cleanUp(); streams.start(); ReadOnlyKeyValueStore<byte[], Integer> keyValueStore = streams.store("test-store", QueryableStoreTypes.keyValueStore()); KeyValueIterator<byte[], Integer> range = keyValueStore.all(); while (range.hasNext()) { KeyValue<byte[], Integer> next = range.next(); System.out.println(next.value); } Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } protected static Properties createStreamsProperties() { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "random167"); streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Integer().getClass().getName()); //streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); return streamsConfiguration; } So my problem is, every time I run this I get this same error:
Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, test-store, may have migrated to another instance.
I'm running only 1 instance of the application, and the topic I'm consuming from has only 1 partition.
Any idea what I'm doing wrong ?