In my kafka streams application, I use one topic for multiple object types, seriazliazed as JSON. I use class name as a key and my idea was that consumers will filter only a subset of incoming entries by key and deserialize them from JSON. I assumed that I can apply initial filtering without defining serdes, but in such case source stream is inferred to <Object,Object>
and the following code does not compile:
return streamsBuilder.stream("topic")
.filter((k, v) -> k.equals("TestClassA"))
.groupByKey()
.reduce((oldValue, newValue) -> newValue,
Materialized.<String, TestClassA, KeyValueStore<Bytes, byte[]>>as(StoreManager.STORE_NAME)
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde<>(TestClassA.class)));
It compiles if I add types to stream definition:
return streamsBuilder.stream(businessEntityTopicName, Consumed.with(Serdes.String(), new JsonSerde<>(TestClassA.class))) {...}
But in this case I get runtime exceptions when for example object of TestClassB appears in a topic. What is the best practice for such cases or should I just use different topics for different objects?