1
votes

I am facing an issue while doing the window aggregations. I want to perform sum of the values per key and the result is sent to the output topic only when window is finished. The problem is that every event in "input" topic will produce an event to "output" topic. I would like to publish an event to the output topic only when a window is finished. For example if the window is of one minute, send a single event per key per minute. The sample code is as follows:

.groupByKey()  
.windowedBy(TimeWindows.of(Duration.ofMinutes(2))
.reduce((v1, v2) -> String.valueOf(Integer.parseInt(v1) + Integer.parseInt(v2)))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream((k,v) -> k.key())
.to("output_topic");

But I am getting the following exception:

Exception in thread "learningtime_application-665cd31a-1957-448b-8cf7-779ab359cfd2-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-REDUCE-STATE-STORE-0000000003 Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

1

1 Answers

2
votes

You are hitting a known bug: https://issues.apache.org/jira/browse/KAFKA-9259

The suppress operator does not correctly pick up the default serdes from the config, i.e., it uses the key serde without converting it to windowed-key-serdes.

As a workaround, you need to specify the serdes explicitly in reduce() via Materialized.with(...). You pass in plain key and value serdes, and reduce will convert the key-serde into a windowed-key-serde that will than also be passed into suppress().