0
votes

I am using a kafka streams component to build aggregates (sum) over a sliding window of 30mins, with grace period of 2mins. I am processing in the order of 10,000 time series (groups). The aggregation uses a persistent state store with logging disabled. To output only final results at the end of the aggregation interval I am using the Suppressed operator.

The aggregates are computed correctly for the majority of time series but there is a small percentage where it is not. In these cases the aggregated values reflect in many cases exactly one single record value from the input stream. Originally I was using aggregation with default record caching enabled (cache.max.bytes.buffering of 10 MB) and Suppressed withNoBound() option enabled to keep allocating more memory as needed.

Based on the following post suggestion:

Suppression buffer memory is independent of Streams’ record cache, so be sure you have enough heap to host the record cache (cache.max.bytes.buffering) in addition to the sum of all the suppression buffer sizes

I increased the record cache size (cache.max.bytes.buffering) from 10MB (default) to 100MB and I obtained a significant improvement of the accuracy of the results. Yet, I can still find few cases from time to time when the aggregates of some groups are computed incorrectly.

My aggregation pipeline:

@StreamListener
@SendTo("output-aggregated")
public KStream<String, Aggregate> aggregatePipeline(
        @Input("input-event") KStream<String, Event> inputEventKStream) {

    Duration windowDuration = Duration.ofMinutes(30);
    Duration retentionPeriod = windowDuration;
    Duration advanceDuration = Duration.ofMinutes(1);
    Duration graceDuration = Duration.ofMinutes(2);

    // custom state store
    WindowBytesStoreSupplier timestampedWindowStore = Stores.persistentTimestampedWindowStore("aggregate-30m",
            retentionPeriod, windowDuration, true);
    Materialized<String, Aggregate, WindowStore<Bytes,byte[]>> materializedCustomStore = Materialized.as(timestampedWindowStore);
    materializedCustomStore.withKeySerde(Serdes.String()).withLoggingDisabled();

    // stream processing
    TimeWindowedKStream<String, Event> timeWindowedKStream = inputEventKStream
            .filter((key, value) -> isEventMatchingAggregationInterval(value, windowDuration))
            .groupBy((key, value) -> Utils.toTimeserieName(value.getSource(), value.getDimensions()))
            .windowedBy(TimeWindows.of(windowDuration).advanceBy(advanceDuration).grace(graceDuration));

    KTable<Windowed<String>, Aggregate> aggregatedKTable = timeWindowedKStream
            .aggregate(
                    () -> new Aggregate(),
                    (key, newValue, aggregate) -> {
                        aggregate.setSum(newValue.getValue() + aggregate.getSum());
                        aggregate.setCount(aggregate.getCount() + 1);
                        return aggregate;
                    },
                    materializedCustomStore);

    return aggregatedKTable
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withNoBound()).withName("suppressed-30m"))
            .toStream()
            .map((key, value) -> new KeyValue<String, Aggregate>(key.key(), value))
    ;
}

My environment: Kafka streams: 2.5.0, Spring Cloud Hoxton.SR8, Spring Boot 2.3.2

My questions:

  1. How can I properly size the record cache to ensure that all aggregates are computed correctly?

  2. Why is that the cache size is affecting the calculations of the final results? Since the suppress is the subsequent downstream operator of the aggregate, wouldn't the Suppressed operator see all the intermediate results for a given window that were flushed assuming that the cache got full?

From the documentation:

The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node whenever the earliest of commit.interval.ms or cache.max.bytes.buffering (cache pressure) hits. Both commit.interval.ms and cache.max.bytes.buffering are global parameters

  1. Is there an option to trigger some logs when the cache size gets fully utilized and a flush is triggered?

  2. Do you have suggestions on which are the best metrics to track the utilization of the record cache and suppression buffers? Anything in addition to "kafka_stream_state_suppression_buffer_size_max", "kafka_stream_record_cache_hit_ratio_max"?

Many thanks!

1

1 Answers

0
votes
  1. The problem with the above approach was in the following line of code:

     WindowBytesStoreSupplier timestampedWindowStore = Stores.persistentTimestampedWindowStore("aggregate-30m",
         retentionPeriod, windowDuration, **true**);
    

The flag marked as true corresponds to retainDuplicates configuration of the state store. Since it was true, duplicate entries for the same key entry were stored. As soon as the buffer record cache would be full, the intermediate aggregate results were pushed into the store. Since multiple intermediate results for the same key were retained, subsequent aggregate operations for the same key could not be computed correctly (i.e., there were multiple intermediate results stored for the same key).

That said, disabling retainDuplicate option addresses the issue even for a record cache size of zero. And the final result of the aggregate will not be affected by the size of the record cache.

    WindowBytesStoreSupplier timestampedWindowStore = Stores.persistentTimestampedWindowStore("aggregate-30m",
        retentionPeriod, windowDuration, false);
  1. The problem was not in Suppressed operator but in misconfiguring the state store used by the Aggregate operator.

  2. DEBUG level logs for package org.apache.kafka.streams.processor.internals provide commit information.