Using kafka-streams, I want to group a stream S of elements E by some key K1 while aggregating all values for that same key into a joined result AGG. This results in a KTable T1.
Depending on the aggregated result, the value should be repartitioned into another KTable T2, grouped by a key K2 taken from the aggregated result AGG. So the aggregated result should generate the key for the next regroup.
In the end I'm only interested in a KTable T2 where the key is K2 and the value is AGG
However, this does not work. I only get a KTable T for the last value. Not a value for each key K2
I know the results of aggregation are only forwarded after some time, so I already tried to lower commit.interval.ms to 1 but to no avail.
I also tried to use through and write the intermediate results to a stream but that didn`t succeed as well.
val finalTable = streamBuilder.kstream("streamS")
.groupBy{ k, v -> createKey1(k, v) }
.aggregate(
{ Agg.empty() },
{ k, v, previousAgg ->
Agg.merge(previousAgg, v)
})
.toStream()
// .through("table1")
.groupBy { k1, agg -> agg.createKey2()}
.reduce{ _, agg -> agg }
For a stream S containing the following values:key1="123", id="1", startNewGroup="false"key1="234", id="2", startNewGroup="false"key1="123", id="3", startNewGroup="false"key1="123", id="4", startNewGroup="true"key1="234", id="5", startNewGroup="false"key1="123", id="6", startNewGroup="false"key1="123", id="7", startNewGroup="false"key1="123", id="8", startNewGroup="true"
I would like the end result to be a KTable having the following latest key-values:key: 123-1, value: 'key1="123", key2="123-1", ids="1,3"'key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'key: 123-4, value: 'key1="123", key2="123-4", ids="4,6,7"'key: 123-8, value: 'key1="123", key2="123-8", ids="8"'
The original stream S of elements is first grouped by key1 where the aggregated result contains the groupby key key1 and adds an extra field key2 containing a combination of key1 with the id of the first occurrence.
Whenever the aggregation received a value with startNewGroup set to true, it returns an aggregation with the key2 field set to the key1 and the id of the new value, effectively creating a new subgroup.
In the second regroup, we simple group by the key2 field.
However what we really observe is the following:key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'key: 123-8, value: 'key1="123", key2="123-8", ids="8"'
stream().groupBy().aggregate().groupBy().aggregate()-- using a customtransform()does not seem to be the simples solution. Note, that converting the first result intostream()changes the semantics from "update" to "facts" - Matthias J. SaxMaterialized.as(null).withCachingDisabled(). -- What I don't understand is, why you say "However, this does not work. I only get a KTable T for the last value. Not a value for each key K2" ? And why does reducing the commit interval not give you "quicker updates" downstream is also unclear to me. - Matthias J. Saxcache.max.bytes.bufferingto zero inStreamsConfig. - Matthias J. Sax