1
votes

We are using kafka-stream aggregation with the time window to compute end sum of events. We have implemented our requirement but we have a problem with intermediate aggregation results. According to the Kafka memory management documentation ( https://kafka.apache.org/11/documentation/streams/developer-guide/memory-mgmt.html ) it seems like there is no way to discard these interemediate results which effects the final results. Please consider below explanation which is taken from the above documentation.

Use the following example to understand the behaviors with and without record caching. In this example, the input is a KStream<String,Integer> with the records <K,V>: <A, 1>, <D, 5>, <A, 20>, <A, 300>. The focus in this example is on the records with key == A.

An aggregation computes the sum of record values, grouped by key, for the input and returns a KTable<String, Integer>.

Without caching: a sequence of output records is emitted for key A that represent changes in the resulting aggregation table. The parentheses (()) denote changes, the left number is the new aggregate value and the right number is the old aggregate value: <A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>.

With caching: a single output record is emitted for key A that would likely be compacted in the cache, leading to a single output record of <A, (321, null)>. This record is written to the aggregation’s internal state store and forwarded to any downstream operations.

The cache size is specified through the cache.max.bytes.buffering parameter, which is a global setting per processing topology:

According to the documentation when aggregation used without caching output records has incremental results. ( We note that even with the caching sometimes this is happening). Our problem is we have other application which acts upon these output aggregation and do some calculations. Therefore when output has intermediate aggregation, these other calculation goes wrong. For example, we might start calculate other stuff when we have <A (21,1)> event ( correct calculation should be done on <A (321, null)> that time window.

Our requirement is to do other calculations only on final aggregation on that window. We have the following question about kafka stream aggregation

  1. When kakfa output intermediate results, doesn't those output has already aggregated data ? For example consider output <A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>. Second output event here <A, (21, 1)> is and third output <A, (321, 21)> has already aggregated value. Is this correct ?
  2. Is there a way to identify intermediate results for a window ?
2
I think example javatips.net/api/examples-master/kafka-streams/src/test/java/io/… can be used here only thing is for the duplicate key I need to take the last entry ( not the first as shown in the example). I tried this also but seems not much easy task ( error prone)Viraj
You would need to build a custom processor with state store and "de-duplicate" intermediate results manually. I also want to point out KIP-328 that suggests to add "final result" capabilities to Kafka Streams: cwiki.apache.org/confluence/display/KAFKA/…Matthias J. Sax

2 Answers

4
votes

Another thing to keep in mind is the commit time interval and cache size governs when results are forwarded downstream.

For example, if your commit interval is 10 seconds that means the results in the cache are forwarded (and written to the changelog topic if logging is enabled) regardless if the cache is full or not.

So you may be able to approximate a single final result if you can set your memory high enough to support setting your commit interval to the desired window time. Of course, this is a coarse-grained approach and effects the entire topology so you'll need to consider and probably prototype a sample application to see if this approach will work for you.

0
votes

I think what are you seeking is the final result of the window. So, you can suppress the result so only the final result is published.

Here is an example of suppressing the final result:

KGroupedStream<UserId, Event> grouped = ...;
grouped
    .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
    .count()
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .filter((windowedUserId, count) -> count < 3)
    .toStream()
    .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));    

As you can see suppress(Suppressed.untilWindowCloses(unbounded())) does the trick.