I have a KTable with data that looks like this (key => value), where keys are customer IDs, and values are small JSON objects containing some customer data:
1 => { "name" : "John", "age_group": "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
I'd like to do some aggregations on this KTable, and basically keep a count of the number of records for each age_group
. The desired KTable data would look like this:
"18-24" => 3
"25-30" => 1
Lets say Alice
, who is in the 18-24
group above, has a birthday that puts her in the new age group. The state store backing the first KTable should now look like this:
1 => { "name" : "John", "age_group": "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
And I'd like the resulting aggregated KTable results to reflect this. e.g.
"18-24" => 2
"25-30" => 2
I may be overgeneralizing the issue described here:
In Kafka Streams there is no such thing as a final aggregation... Depending on your use case, manual de-duplication would be a way to resolve the issue"
But I have only been able to calculate a running total so far, e.g. Alice's birthday would be interpreted as:
"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well
Edit: here is some additional behavior that I noticed that seems unexpected.
The topology I'm using looks like:
dataKTable = builder.table("compacted-topic-1", "users-json")
.groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
.count("age-range-counts")
1) Empty State
Now, from the initial, empty state, everything looks like this:
compacted-topic-1
(empty)
dataKTable
(empty)
// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)
// count()
age-range-counts state store
(empty)
2) Send a couple of messages
Now, lets send a message to the compacted-topic-1
, which is streamed as a KTable above. Here is what happens:
compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4
// count()
age-range-counts state store
18-24 => 0
So I'm wondering:
- Is what I'm trying to do even possible using Kafka Streams 0.10.1 or 0.10.2? I've tried using
groupBy
andcount
in the DSL, but maybe I need to use something likereduce
? - Also, I'm having a little trouble understanding the circumstances that lead to the
add
reducer and thesubtract
reducer being called, so any clarification around any of these points will be greatly appreciated.