2
votes

In my kafka streams application, I use one topic for multiple object types, seriazliazed as JSON. I use class name as a key and my idea was that consumers will filter only a subset of incoming entries by key and deserialize them from JSON. I assumed that I can apply initial filtering without defining serdes, but in such case source stream is inferred to <Object,Object> and the following code does not compile:

 return streamsBuilder.stream("topic")
            .filter((k, v) -> k.equals("TestClassA"))
            .groupByKey()
            .reduce((oldValue, newValue) -> newValue,
                    Materialized.<String, TestClassA, KeyValueStore<Bytes, byte[]>>as(StoreManager.STORE_NAME)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(TestClassA.class)));

It compiles if I add types to stream definition:

return streamsBuilder.stream(businessEntityTopicName, Consumed.with(Serdes.String(), new JsonSerde<>(TestClassA.class))) {...}

But in this case I get runtime exceptions when for example object of TestClassB appears in a topic. What is the best practice for such cases or should I just use different topics for different objects?

2

2 Answers

3
votes

If you don't specify any Serde in #stream() and don't overwrite the default from StreamsConfig Kafka Streams will use byte-array serdes. Thus, you would get

KStream<byte[], byte[]> streams = builder.<byte[], byte[]>stream("topicName");

Note, that Java itself falls back to KStream<Object, Object> if you don't specify the correct type on the right hand side as shown above. But the actual type at runtime would be byte[] for both cases.

Thus, you could apply a filter, but it would need to work on byte[] data type.

I guess, what you actually want to do is to only specify a StringSerde for the key:

KStream<String, byte[]> streams = builder.<String, byte[]>("topicName", Consumed.with(Serdes.String(), null)); // null with fall back to defaul Serde from StreamConfig

This allows you to apply your filter() based on String keys before the groupByKey() operation.

0
votes

I have a similar use case. I make all the possible objects inherit a common interface (Event) and annotate with @JsonTypeInfo so jackson can deserialize the object properly.

streamsBuilder.stream("topic")//need to add some sort of JSONSerde<Event> to this stream call, i use personally use the one bundled with spring
            .filter((k, v) -> v instanceOf testClassA)