I have a Kafka Streams application for which, whenever I restart it, the offsets for the topic it is consuming get reset. Hence, for all partitions, the lags increase and the app needs to reprocess all the data.
UPDATE: The output topic is receiving a burst of events that were already processed after the App gets restarted, is not that the input topic offsets are getting reset as I said in the previous paragraph. However, the internal topic (KTABLE-SUPPRESS-STATE-STORE) offsets are getting reset, see comments below.
I have ensured the lag is 1 for every partition before the restart (this is for the output topic). All consumers that belong to that consumer-group-id (app-id) are active. The restart is immediate, it takes around 30 secs.
The app is using exactly once as processing guarantee.
I have read this answer How does an offset expire for an Apache Kafka consumer group? .
I have tried with auto.offset.reset = latest and auto.offset.reset = earliest.
It seems like the offsets for these topics are not effectively committed, (but I am not sure about this).
I assume that after the restart the app should pick-up from the latest committed offset for that consumer group.
UPDATE: I assume this for the internal topic (KTABLE-SUPPRESS-STATE-STORE)
Does the Kafka Stream API ensure to commit all consumed offset before shutting down? (after calling streams.close())
I would really appreciate any clue about this.
UPDATE:
This is the code the App execute:
final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
.stream(inputTopicNames, Consumed.with(..., ...)
.withTimestampExtractor(...);
events
.filter((k, v) -> ...)
.flatMapValues(v -> ...)
.flatMapValues(v -> ...)
.selectKey((k, v) -> v)
.groupByKey(Grouped.with(..., ...))
.windowedBy(
TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
.advanceBy(Duration.ofSeconds(windowSizeInSecs))
.grace(Duration.ofSeconds(windowSizeGraceInSecs)))
.reduce((agg, new) -> {
...
return agg;
})
.suppress(Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()))
.toStream()
.to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
The offset reset just and always happens (after restarting) with the KTABLE-SUPPRESS-STATE-STORE internal topic created by the Kafka Stream API.
I have tried with the Processing guarantee exactly once and at least once.
Once again, I will really appreciate any clue about this.
UPDATE: This has been solved in the release 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895)
bin/kafka-consumer-groups.sh
tool. I have not tried with other processing guarantees since in my case I need exactly one, but worth to try just to discard that could be the issue. However, I will be surprised if it is. – Jonathan Santillistreams.close()
-- try to verify this. Also, offsets should be committed all 100ms by default if exactly-once is enabled. Not sure why the offsets are not picked up at startup again. I would recommend to inspect the logs -- maybe increase the log level to DEBUG to get more information. – Matthias J. Sax