6
votes

I have a Kafka Streams application for which, whenever I restart it, the offsets for the topic it is consuming get reset. Hence, for all partitions, the lags increase and the app needs to reprocess all the data.

UPDATE: The output topic is receiving a burst of events that were already processed after the App gets restarted, is not that the input topic offsets are getting reset as I said in the previous paragraph. However, the internal topic (KTABLE-SUPPRESS-STATE-STORE) offsets are getting reset, see comments below.

I have ensured the lag is 1 for every partition before the restart (this is for the output topic). All consumers that belong to that consumer-group-id (app-id) are active. The restart is immediate, it takes around 30 secs.

The app is using exactly once as processing guarantee.

I have read this answer How does an offset expire for an Apache Kafka consumer group? .

I have tried with auto.offset.reset = latest and auto.offset.reset = earliest.

It seems like the offsets for these topics are not effectively committed, (but I am not sure about this).

I assume that after the restart the app should pick-up from the latest committed offset for that consumer group.

UPDATE: I assume this for the internal topic (KTABLE-SUPPRESS-STATE-STORE)

Does the Kafka Stream API ensure to commit all consumed offset before shutting down? (after calling streams.close())

I would really appreciate any clue about this.

UPDATE:

This is the code the App execute:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(...);

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))              
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(
                  Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));

The offset reset just and always happens (after restarting) with the KTABLE-SUPPRESS-STATE-STORE internal topic created by the Kafka Stream API.

I have tried with the Processing guarantee exactly once and at least once.

Once again, I will really appreciate any clue about this.

UPDATE: This has been solved in the release 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895)

3
You say you are not sure that offsets are really committed. Have you looked at the consumer__offsets topic to verify this? This blog post might help to find this out: medium.com/@felipedutratine/… .user152468
Have you tried if it works without the exactly once processing guarantee? This is a relatively new feature.user152468
Thanks for the comment @user152468 , yes I have check with the bin/kafka-consumer-groups.sh tool. I have not tried with other processing guarantees since in my case I need exactly one, but worth to try just to discard that could be the issue. However, I will be surprised if it is.Jonathan Santilli
Offsets should be committed on streams.close() -- try to verify this. Also, offsets should be committed all 100ms by default if exactly-once is enabled. Not sure why the offsets are not picked up at startup again. I would recommend to inspect the logs -- maybe increase the log level to DEBUG to get more information.Matthias J. Sax
Hello @MatthiasJ.Sax thanks for the reply, I have found this in the logs: INFO [MI-APP-ID-xxx-StreamThread-4] internals.StoreChangelogReader (StoreChangelogReader.java:215) - stream-thread [MI-APP-ID-xxx-StreamThread-4] No checkpoint found for task 1_5 state store KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog MI-APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-5 with EOS turned on. Reinitializing the task and restore its state from the beginning. ...follow next commentJonathan Santilli

3 Answers

2
votes

The offset reset just and always happens (after restarting) with the KTABLE-SUPPRESS-STATE-STORE internal topic created by the Kafka Stream API.

This is currently (version 2.1) expected behavior, because the suppress() operator works in-memory only. Thus, on restart, the suppress buffer must be recreate from the changelog topic before processing can start.

Note, it is planned to let suppress() write to disk in future releases (cf. https://issues.apache.org/jira/browse/KAFKA-7224). This will avoid the overhead of recreating the buffer from the changelog topic.

0
votes

I think @Matthias J. Sax 's reply covers most of the internals of suppress. One thing I need to clarify though: when you say "restart the application", what exactly did you do? Did you shutdown the whole application gracefully, and then restart it?

-2
votes

Commit frequency is controlled by the parameter commit.interval.ms. Check whether your offsets are indeed committed. By default, offsets are committed every 100 ms or 30 secs, depending upon your processing guarantee config. Check this out