I have an issue with Kafka Streams (0.10.1.1). I'm trying to create a KStream
and a KTable
on the same topic.
The first approach I tried was simple calling the KStreamBuilder
methods for stream and table on the same topic. This resulted in
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.
OK, this seems to be some restriction built-into Kafka Streams.
My second approach was initially creating a KTable
and using the toStream()
method on it. This has the issue that KTables
do some internal buffering/flushing, so the output stream does not reflect all input elements if a key is occurring multiple times as in my example. This is a problem as I'm counting the occurrences of a key.
The approach that seems to work is to initially create a KStream
, group it by key and then "reduce" it by discarding the old aggregate and just keeping the new value. I'm not too happy with this approach as a) it seems very complicated and b) the Reducer
interface does not specify which one is the already aggregated value and which one is the new one. I went with convention and kept the second one, but ... meh.
So the question boils down to: is there a better way? Am I missing something blindingly obvious?
Please keep in mind that I'm not working on a proper use case – this is just me getting to know the Streams-API.