1
votes

As part of our application logic, we use Kafka Streams state store for range lookups, data is loaded from Kafka topic using builder.table() method.

The problem is that source topic's key is serialised as JSON and doesn't suite well to binary key comparisons used internally in RocksDB based state store.

We were hoping to use a separate serde for keys by passing it to Materialized.as(). However, it looks like that streams implementation resets whatever is passed to the original serdes used to load from the table topic.

This is what I can see in streams builder internals:

public synchronized <K, V> KTable<K, V> table(final String topic,
                                              final Consumed<K, V> cons,
                                              final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
    Objects.requireNonNull(topic, "topic can't be null");
    Objects.requireNonNull(consumed, "consumed can't be null");
    Objects.requireNonNull(materialized, "materialized can't be null");
    materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
    return internalStreamsBuilder.table(topic,
                                        new ConsumedInternal<>(consumed),
                                        new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
}

Anybody knows why it's done this way, and if it's possible to use a different serde for a DSL state store?

Please don't propose using Processor API, this route is well explored. I would like to avoid writing a processor and a custom state store every time when I need to massage data before saving it into a state store.

After some digging through streams sources, I found out that I can pass a custom Materialized.as to the filter with always true predicate. But it smells a bit hackerish.

This is my code, that unfortunately doesn't work as we hoped to, because of "serdes reset" described above.

Serde<Value> valueSerde = new JSONValueSerde()
KTable<Key, Value> table = builder.table(
    tableTopic,
    Consumed.with(new JSONKeySerde(), valueSerde)
    Materialized.as(cacheStoreName)
        .withKeySerde(new BinaryComparisonsCompatibleKeySerde())
        .withValueSerde(valueSerde)
)
1
Thank you, Matthias, appreciate your quick response. May be I'm doing it in a wrong way, but quite often I need to create different "indexes" to the same table topic data - to allow searching through it, or to enable random access by a sub-key. What is the reccomended solution for it? Should I just copy data to an external DB in such cases?Kotyara
We actually face a very similar issue to the one described here. We'd like to use a different Serde for the state store underlying a GlobalKTable to avoid the unnecessary registration of the schema in the schema registry that is only used internally for the state store. So far the only option I can think of is creating a custom Serde to "catch" internal state store use case (no schema registry integration) vs. external topic use case (requires schema registry integration). But as you mentioned this sounds really hacky...msilb

1 Answers

0
votes

The code works by design. From a streams point of view, there is no reason to use a different Serde for the store are for reading the data from the topic, because it's know to be the same data. Thus, if one does not use the default Serdes from the StreamsConfig, it's sufficient to specify the Serde once (in Consumed) and it's not required to specify it in Materialized again.

For you special case, you could read the topic as a stream a do a "dummy aggregation" that just return the latest value per record (instead of computing an actual aggregate). This allows you to specify a different Serde for the result type.