0
votes

Configuration for kafka Streams:

threads = 1;
replicationFactor = 1;
ktableCommitInterval= 10000;
ktableMemory=72000000;
timeDuration=10;

Topology:

KStream<Windowed<String>,String> windowedStringKStream =
                         streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(),Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(),Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(timeDuration)).grace(Duration.ofSeconds(0)))
                        .reduce(Numners::append,Materialized.<String, String, WindowStore<Bytes,byte[]>>as(storeName).withCachingEnabled().withRetention(Duration.ofSeconds(timeDuration)).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
.toStream();
       
 

Code Description:

Code appends numbers in a 10 Second window. Incremental Number records are sent exactly at a interval of 1 second into input topic.

Output: Output topic results with timestamp from kafka tool. As we can see every alternate output is a intermediate window result

Problem:

Commit interval is set to 10 seconds. Cache size is set to 72 mb. Data is in bytes. State store has caching enabled. Documentation states that the operating semantics of kafka streams pushing data to downstream is dependent on cache size or commit interval whatever happens first. But as per the experiments, commit is happening two times in a minute. The observation is Commit interval is starting when application starts but windowing starts when data starts coming. As seen in the image, intermediate window results are pushed and final window results are also pushed.

Using Suppress() is not possible for the usecase i am working on as it will not flush the data if there is no new data coming on the topic.

Any help would be appreciated. If somebody is facing this or wants to reproduce this let me know.

1

1 Answers

2
votes

The only solution would be to build a custom transform() that implements a custom version of "suppress" that allows you to emit data even if no new input data arrives (eg, wall-clock time punctuations might help to implement it).

Currently (as of Apache Kafka 2.4 release), there is no built-in support. If you don't use suppress() a windowed aggregation may always emit some intermediate result before the window is actually closed and there is no way to configure Kafka Streams differently.