13
votes

Is there functionality built into Kafka Streams that allows for dynamically connecting a single input stream into multiple output streams? KStream.branch allows branching based on true/false predicates, but this isn't quite what I want. I'd like each incoming log to determine the topic it will be streamed to at runtime, e.g., a log {"date": "2017-01-01"} will be streamed to the topic topic-2017-01-01 and a log {"date": "2017-01-02"} will be streamed to the topic topic-2017-01-02.

I could call forEach on the stream, then write to a Kafka producer, but that doesn't seem very elegant. Is there a better way to do this within the Streams framework?

1
What do you mean by "based on a string" -- btw: KStream.branch takes multiple predicates (your question indicates you missed this). So branch should allow to do what you want. Maybe you can give a data example?Matthias J. Sax
I should be more clear. I'm aware that it takes multiple predicates--that would be a fine solution if I had a fixed number of topics I wanted to stream to. However, what I'm looking to do is to write to topics named foo-{date}.kellanburket

1 Answers

5
votes

If you want to create topics dynamically based on your data, you do not get any support within Kafka's Streaming API at the moment (v0.10.2 and earlier). You will need to create a KafkaProducer and implement your dynamic "routing" by yourself (for example using KStream#foreach() or KStream#process()). Note, that you need to do synchronous writes to avoid data loss (which are not very performant unfortunately). There are plans to extend Streaming API with dynamic topic routing, but there is no concrete timeline for this feature right now.

There is one more consideration you should take into account. If you do not know your destination topic(s) ahead of time and just rely on the so-called "topic auto creation" feature, you should make sure that those topics are being created with the desired configuration settings (e.g., number of partitions or replication factor).

As an alternative to "topic auto creation" you can also use Admin Client (available since v0.10.1) to create topics with correct configuration. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations