I have a Kafka Streams app that groups incoming messages by several values. For example:
Example message:
{ "gender": "female", "location": "canada", "age-group": "25-30" }
Topology:
table
.groupBy((key, value) -> groupByGender) // example key: female
.count("gender-counts");
table
.groupBy((key, value) -> groupByLocation) // example key: canada
.count("location-counts");
table
.groupBy((key, value) -> groupByAgeGroup) // example key: 25-30
.count("age-group-counts");
This results in lots of topics:
my-consumer-gender-counts-changelog
my-consumer-gender-counts-repartition
my-consumer-location-counts-changelog
my-consumer-location-counts-repartition
my-consumer-age-group-counts-changelog
my-consumer-age-group-counts-repartition
It would be nice if we could send multiple aggregations to a single state store, and include the group by value as part of the key. For example:
table
.groupBy((key, value) -> groupByGender) // example key: female_gender
.count("counts");
table
.groupBy((key, value) -> groupByLocation) // example key: canada_location
.count("counts");
table
.groupBy((key, value) -> groupByAgeGroup) // example key: 25-30_age_group
.count("counts");
This would result in far fewer topics:
counts-changelog
counts-repartition
This currently doesn't appear to be possible (using the DSL anyways), since using the groupBy
operator creates an internal topic for repartitioning, so if we have multiple sub-topologies that groupBy
different things, then Kafka Streams will attempt to register the same repartitioning topic from multiple sources. This results in the following error:
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic counts-repartition has already been registered by another source.
at org.apache.kafka.streams.processor.TopologyBuilder.validateTopicNotAlreadyRegistered(TopologyBuilder.java:518)
If groupBy
could return more than one record (e.g. like flatMap
does), then we could return a collection of records (one record for each grouping), but this too doesn't seem to be possible using the DSL.
My question is, given a single record that can be grouped by multiple values (e.g. { "gender": "female", "location": "canada", "age-group": "25-30" }
), should the creation of multiple topics (2 for each grouping) ever be of concern (e.g. what we we had 100 different groupings)? Are there other strategies that might be a better fit when a single record could be grouped by several values? Is what I'm proposing (sinking multiple aggregations to a single changelog topic) a bad idea (even when the number of unique keys is very low)?