In my Kafka streams application I have a single processor that is scheduled to produce output messages every 60 seconds. Output message is built from messages that come from a single input topic. Sometimes it happens that the output message is bigger than the configured limit on broker (1MB by default). An exception is thrown and the application shuts down. Commit interval is set to default (60s).
In such case I would expect that on the next run all messages that were consumed during those 60s preceding the crash would be re-consumed. But in reality the offset of those messages is committed and the messages are not processed again on the next run.
Reading answers to similar questions it seems to me that the offset should not be committed. When I increase commit interval to 120s (processor still punctuates every 60s) then it works as expected and the offset is not committed.
I am using default processing guarantee but I have also tried exactly_once
. Both have the same result. Calling context.commit()
from processor seems to have no effect on the issue.
Am I doing something wrong here?