2
votes

I am doing a poc on kafka streams and ktables.I was wondering if there is any way to store data(key-value pair or key-object pair) in Kafka, either through streams, ktables, state-stores, so that i can retrieve data bases on both keys and values. I created a kstream based on topic, on which i pushed some messages and using wordcountalgo, i populated values in ktable created above kstream. Something like this:

StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer-store"),Serdes.String(), customerSerde)
                .withLoggingEnabled(new HashMap<>());
streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde)).to("customer-to-ktable-topic",Produced.with(Serdes.String(), customerSerde));
KTable<String, Customer> customerKTable = streamsBuilder.table("customer-to-ktable-topic", Consumed.with(Serdes.String(), customerSerde),Materialized.as(customerStateStore.name()));

I am not able to fetch records based on values.

https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KTable.html Only get(String key) function is available in the documentation of kafka doc. However, I am exploring to see if this can be achieved some other way?

2
FYI: The get() function you refer to is from kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/…. This javadoc also list other functions such as all() as mentioned in Val Bonn's answer.Michael G. Noll

2 Answers

1
votes

Your customerStateStore is a key-value store and as you stated, you can only query based on keys.

One proposal would be to work on the IN flow, in order to use the value (or part of the value) as a key in the store. You can do that with the map() method. The idea could be to achieve something like:

Original IN msg: key1 - value1

Would generate 2 entries in the store:
    key1 - value1
    value1 - key1 (or whatever depending on your usecase)

Doing this, you will be able to query the store on the value1, because it is a key. (Be careful if in the IN topic you have the same value for different keys.)

Or, as an alternative to @NishuTayal's suggestion, you can loop on all the entries of the store and evaluate the values, with the method all(): https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.html#all--

Obviously this will degrade the performance, but depending on the size of your (in memory) store and your usecase (get all the entries for a given value? only one entry? ...), it might not add too much delay to the processing.

But you have to be careful with the partitioning of your input topic: one given value may then be present in several partitions of your topic, and then be present in different instances of your KS app.

0
votes

You can use filter operation to make key or value based lookups

customerKTable.filter((key, value) -> value.get("country") != "USA")