1

I have a topic which receives events with the following info:

key -> orderId (Integer)

value -> {"orderId" : aaa, "productId" : xxx, "userId" : yyy, "state" : "zzz"} (JSON with the whole info of the order)

I want to implement a interactive query to get the full order information by orderId. The idea is be able to get the current state of an order from a materialized view (the Kafka Streams store).

First I create the KStream of the topic:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC);

Then I create a KTable to assign it to a store. The problem is that apparently I can only create stores where the value is an aggregation, for instance: stream.groupByKey().count("myStore");

The store I need should have the whole order information, not an aggregation. Is this possible?

1 Answer 1

1

You can read the topic directly as a KTable, too:

KTable<Integer, JsonNode> stream = kStreamBuilder.table(integerSerde, jsonSerde, STREAMING_TOPIC, "store-name-for-IQ"); 

This FAQ might also help: http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

Sign up to request clarification or add additional context in comments.

3 Comments

Hi Matthias, this is almost what I need, for simplicity I had omitted a step: after I create stream there is a filter operation to rule out some of the orders. That is to say, I have to create the KTable from a KStream, not directly from the topic. From the linked info I guess I would have to use the dummy aggregation right?
There's something else. I used the dummy aggregation: .groupByKey().reduce( (val1, val2) -> val2, "OrdersStore"); The problem is that there is a serialization error. How can I specify the value serialization as JSON? props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, XXXX);
Fixed it using a custom Serde: props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); public class JsonSerde implements Serde<JsonNode>{ private Serializer<JsonNode> serializer; private Deserializer<JsonNode> deserializer; ...

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.