1
votes

I want to be able to send all records in a Kafkastream to a different topic based on the key of the message key. Ex. A stream in Kafka contains name as key and record as value. I want to fan out these records to different topic based on the key of the record

data : (jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}), expected

  • topic1 :name: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})
  • topic2 :sean-> (sean -> {seansRecord})
  • topic3 :mary -> (mary -> {marysRecord})

Below is the way I am doing this right now, but since the list of names is hudge this is slow. Plus even if there are records a few names, I need to traverse the entire list Please suggest a fix

    for( String name : names )
    {
        recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
    } 
3
Note: This will potentially end up with a bunch of topics (with default settings). What is the use case here? - OneCricketeer
@cricket_007 thanks for pointing that out, but that's the intention here. there are 100s of records per name and each of them needs to be processed, aggregated individually. - Count

3 Answers

5
votes

I think you should use KStream::to(final TopicNameExtractor<K, V> topicExtractor) function. It gives you ability to calculate name of the topic for each message.

Sample code:

final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);
1
votes

I think what you're looking for is KStream#branch.

Following is untested, but it shows the general idea

// get a list of predicates to branch a topic on
final List<String> names = Arrays.asList("jhon", "sean", "mary");
final Predicate[] predicates = names.stream()
    .map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))
    .toArray(Predicate[]::new);

// example input
final KStream<Object, Object> stream = new StreamsBuilder().stream("names");

// split the topic
KStream<String, Object>[] branches = stream.branch(predicates);
for (int i = 0; i < names.size(); i++) {
    branches[i].to(names.get(i));
}

// KStream branches[0] contains all records whose keys are "jhon"
// KStream branches[1] contains all records whose keys are "sean"
...
1
votes

If you need to generate aggregate data for each user, you don't need to write to a separate topic per user. You'd be better off writing an aggregate on the source stream. This way you won't end up with one topic per key, but you can still run operations on each user independently.

Serde<UserRecord> recordSerde = ...
KStream<Stream, UserAggregate> aggregateByName = recordsByName
   .groupByKey(Grouped.with(Serdes.String(), recordSerde))
   .aggregate(...)
   .toStream()

See https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#aggregating for details

This approach will scale to millions of users, someone you won't currently be able to achieve with the one topic per user approach.