8
votes

I have an issue with Kafka Streams (0.10.1.1). I'm trying to create a KStreamand a KTableon the same topic.

The first approach I tried was simple calling the KStreamBuilder methods for stream and table on the same topic. This resulted in

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.

OK, this seems to be some restriction built-into Kafka Streams.

My second approach was initially creating a KTableand using the toStream() method on it. This has the issue that KTablesdo some internal buffering/flushing, so the output stream does not reflect all input elements if a key is occurring multiple times as in my example. This is a problem as I'm counting the occurrences of a key.

The approach that seems to work is to initially create a KStream, group it by key and then "reduce" it by discarding the old aggregate and just keeping the new value. I'm not too happy with this approach as a) it seems very complicated and b) the Reducer interface does not specify which one is the already aggregated value and which one is the new one. I went with convention and kept the second one, but ... meh.

So the question boils down to: is there a better way? Am I missing something blindingly obvious?

Please keep in mind that I'm not working on a proper use case – this is just me getting to know the Streams-API.

1

1 Answers

7
votes

About adding a topic twice: that's not possible, because a Kafka Streams application is a single "consumer group" and thus can only commit offsets for a topic a single time, while adding a topic twice would indicate that the topic get's consumer twice (and independent progress).

For the approach KTable#toStream(), you can disable caching via StreamsConfig parameter cache.max.bytes.buffering == 0. However, this is a global setting and disables caching/deduplication for all KTables (cf. http://docs.confluent.io/current/streams/developer-guide.html#memory-management).

Update: Since Kafka 0.11 it's possible to disable caching for each KTable individually via Materialized parameter.

The groupBy approach works also, even if it requires some boilerplate. We considering adding KStream#toTable() to the API to simplify this transformation. And yes, second argument in reduce is the new value -- as reduce is for combining two values, the API has no concept of "old" and "new" and thus the parameters do not have such naming.