I am using a kafka streams component to build aggregates (sum) over a sliding window of 30mins, with grace period of 2mins. I am processing in the order of 10,000 time series (groups). The aggregation uses a persistent state store with logging disabled. To output only final results at the end of the aggregation interval I am using the Suppressed operator.
The aggregates are computed correctly for the majority of time series but there is a small percentage where it is not. In these cases the aggregated values reflect in many cases exactly one single record value from the input stream.
Originally I was using aggregation with default record caching enabled (cache.max.bytes.buffering
of 10 MB) and Suppressed withNoBound() option enabled to keep allocating more memory as needed.
Based on the following post suggestion:
Suppression buffer memory is independent of Streams’ record cache, so be sure you have enough heap to host the record cache (cache.max.bytes.buffering) in addition to the sum of all the suppression buffer sizes
I increased the record cache size (cache.max.bytes.buffering
) from 10MB (default) to 100MB and I obtained a significant improvement of the accuracy of the results. Yet, I can still find few cases from time to time when the aggregates of some groups are computed incorrectly.
My aggregation pipeline:
@StreamListener
@SendTo("output-aggregated")
public KStream<String, Aggregate> aggregatePipeline(
@Input("input-event") KStream<String, Event> inputEventKStream) {
Duration windowDuration = Duration.ofMinutes(30);
Duration retentionPeriod = windowDuration;
Duration advanceDuration = Duration.ofMinutes(1);
Duration graceDuration = Duration.ofMinutes(2);
// custom state store
WindowBytesStoreSupplier timestampedWindowStore = Stores.persistentTimestampedWindowStore("aggregate-30m",
retentionPeriod, windowDuration, true);
Materialized<String, Aggregate, WindowStore<Bytes,byte[]>> materializedCustomStore = Materialized.as(timestampedWindowStore);
materializedCustomStore.withKeySerde(Serdes.String()).withLoggingDisabled();
// stream processing
TimeWindowedKStream<String, Event> timeWindowedKStream = inputEventKStream
.filter((key, value) -> isEventMatchingAggregationInterval(value, windowDuration))
.groupBy((key, value) -> Utils.toTimeserieName(value.getSource(), value.getDimensions()))
.windowedBy(TimeWindows.of(windowDuration).advanceBy(advanceDuration).grace(graceDuration));
KTable<Windowed<String>, Aggregate> aggregatedKTable = timeWindowedKStream
.aggregate(
() -> new Aggregate(),
(key, newValue, aggregate) -> {
aggregate.setSum(newValue.getValue() + aggregate.getSum());
aggregate.setCount(aggregate.getCount() + 1);
return aggregate;
},
materializedCustomStore);
return aggregatedKTable
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withNoBound()).withName("suppressed-30m"))
.toStream()
.map((key, value) -> new KeyValue<String, Aggregate>(key.key(), value))
;
}
My environment: Kafka streams: 2.5.0, Spring Cloud Hoxton.SR8, Spring Boot 2.3.2
My questions:
How can I properly size the record cache to ensure that all aggregates are computed correctly?
Why is that the cache size is affecting the calculations of the final results? Since the suppress is the subsequent downstream operator of the aggregate, wouldn't the Suppressed operator see all the intermediate results for a given window that were flushed assuming that the cache got full?
From the documentation:
The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node whenever the earliest of commit.interval.ms or cache.max.bytes.buffering (cache pressure) hits. Both commit.interval.ms and cache.max.bytes.buffering are global parameters
Is there an option to trigger some logs when the cache size gets fully utilized and a flush is triggered?
Do you have suggestions on which are the best metrics to track the utilization of the record cache and suppression buffers? Anything in addition to "kafka_stream_state_suppression_buffer_size_max", "kafka_stream_record_cache_hit_ratio_max"?
Many thanks!