0
votes

Trying to merge multiple Kafka Streams, aggregate & produce to a new topic. However with in the same window, the code produces as many aggregated records as the total input records in each of the input streams. I would expect the aggregate only to produce 1 output at the end of the join window. What am I doing wrong in the code below -

val streams = requestStreams.merge(successStreams).merge(errorStreams)
                .groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde()))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
                .aggregate({ null }, StreamAggregators.notificationMetricAggregator, Materialized.`as`<String, NotificationMetric, WindowStore<Bytes, ByteArray>>("ag-store")
                        .withValueSerde(serdesConfig.notificationMetricSerde()))
                .toStream()

streams.to(notificationStreamsConfig.metricsTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String::class.java, 10), serdesConfig.notificationMetricSerde()))
1

1 Answers

0
votes

Kafka Streams uses a continuous update processing model by default. Note, that the result of an aggregation is a KTable. This result table contains a row for each window, and each time a new record is processed, the window (ie, row in the table) is updated.

If you call KTable#toStream() you get the table's changelog stream that contains a record for each update to the table.

If you want to get only a single result per window, you can use the suppress() operator to get a second KTable, ie, suppress() takes the first KTables changelog stream, and waits until a window is closed, and only inserts the final result into it's output KTable. If you use suppress(), you should set the grace period for the upstream windowed-aggregation (default is 24h) to a lower value, ie, TimeWindows.of(...).grace(...).

For more details check out this blog post: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers