As part of our application logic, we use Kafka Streams state store for range lookups, data is loaded from Kafka topic using builder.table()
method.
The problem is that source topic's key is serialised as JSON and doesn't suite well to binary key comparisons used internally in RocksDB based state store.
We were hoping to use a separate serde for keys by passing it to Materialized.as()
. However, it looks like that streams implementation resets whatever is passed to the original serdes used to load from the table topic.
This is what I can see in streams builder internals:
public synchronized <K, V> KTable<K, V> table(final String topic,
final Consumed<K, V> cons,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(consumed),
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
}
Anybody knows why it's done this way, and if it's possible to use a different serde for a DSL state store?
Please don't propose using Processor API, this route is well explored. I would like to avoid writing a processor and a custom state store every time when I need to massage data before saving it into a state store.
After some digging through streams sources, I found out that I can pass a custom Materialized.as
to the filter with always true predicate. But it smells a bit hackerish.
This is my code, that unfortunately doesn't work as we hoped to, because of "serdes reset" described above.
Serde<Value> valueSerde = new JSONValueSerde()
KTable<Key, Value> table = builder.table(
tableTopic,
Consumed.with(new JSONKeySerde(), valueSerde)
Materialized.as(cacheStoreName)
.withKeySerde(new BinaryComparisonsCompatibleKeySerde())
.withValueSerde(valueSerde)
)
Serde
for the state store underlying aGlobalKTable
to avoid the unnecessary registration of the schema in the schema registry that is only used internally for the state store. So far the only option I can think of is creating a customSerde
to "catch" internal state store use case (no schema registry integration) vs. external topic use case (requires schema registry integration). But as you mentioned this sounds really hacky... – msilb