I am facing an issue while doing the window aggregations. I want to perform sum of the values per key and the result is sent to the output topic only when window is finished. The problem is that every event in "input" topic will produce an event to "output" topic. I would like to publish an event to the output topic only when a window is finished. For example if the window is of one minute, send a single event per key per minute. The sample code is as follows:
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2))
.reduce((v1, v2) -> String.valueOf(Integer.parseInt(v1) + Integer.parseInt(v2)))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream((k,v) -> k.key())
.to("output_topic");
But I am getting the following exception:
Exception in thread "learningtime_application-665cd31a-1957-448b-8cf7-779ab359cfd2-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-REDUCE-STATE-STORE-0000000003 Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')