We are using kafka-stream
aggregation with the time window to compute end sum of events. We have implemented our requirement but we have a problem with intermediate aggregation results. According to the Kafka memory management documentation ( https://kafka.apache.org/11/documentation/streams/developer-guide/memory-mgmt.html ) it seems like there is no way to discard these interemediate results which effects the final results. Please consider below explanation which is taken from the above documentation.
Use the following example to understand the behaviors with and without record caching. In this example, the input is a
KStream<String,Integer>
with the records<K,V>: <A, 1>, <D, 5>, <A, 20>, <A, 300>
. The focus in this example is on the records withkey == A
.An aggregation computes the sum of record values, grouped by key, for the input and returns a
KTable<String, Integer>
.Without caching: a sequence of output records is emitted for key A that represent changes in the resulting aggregation table. The parentheses (()) denote changes, the left number is the new aggregate value and the right number is the old aggregate value:
<A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>
.With caching: a single output record is emitted for key A that would likely be compacted in the cache, leading to a single output record of
<A, (321, null)>
. This record is written to the aggregation’s internal state store and forwarded to any downstream operations.The cache size is specified through the
cache.max.bytes.buffering
parameter, which is a global setting per processing topology:
According to the documentation when aggregation used without caching output records has incremental results. ( We note that even with the caching sometimes this is happening). Our problem is we have other application which acts upon these output aggregation and do some calculations. Therefore when output has intermediate aggregation, these other calculation goes wrong. For example, we might start calculate other stuff when we have <A (21,1)>
event ( correct calculation should be done on <A (321, null)>
that time window.
Our requirement is to do other calculations only on final aggregation on that window. We have the following question about kafka stream aggregation
- When kakfa output intermediate results, doesn't those output has already aggregated data ? For example consider output
<A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>
. Second output event here<A, (21, 1)>
is and third output<A, (321, 21)>
has already aggregated value. Is this correct ? - Is there a way to identify intermediate results for a window ?