0
votes

In my Kafka streams application I have a single processor that is scheduled to produce output messages every 60 seconds. Output message is built from messages that come from a single input topic. Sometimes it happens that the output message is bigger than the configured limit on broker (1MB by default). An exception is thrown and the application shuts down. Commit interval is set to default (60s).

In such case I would expect that on the next run all messages that were consumed during those 60s preceding the crash would be re-consumed. But in reality the offset of those messages is committed and the messages are not processed again on the next run.

Reading answers to similar questions it seems to me that the offset should not be committed. When I increase commit interval to 120s (processor still punctuates every 60s) then it works as expected and the offset is not committed.

I am using default processing guarantee but I have also tried exactly_once. Both have the same result. Calling context.commit() from processor seems to have no effect on the issue.

Am I doing something wrong here?

2

2 Answers

1
votes

The contract of a Processor in Kafka Streams is, that you have fully processed an input record and forward() all corresponding output messages before process() return. -- This contract implies that Kafka Streams is allowed to commit the corresponding offset after process() returns.

It seem you "buffer" messages within process() in-memory to emit them later. This violated this contract. If you want to "buffer" messages, you should attach a state store to the Processor and put all those messages into the store (cf https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#state-stores). The store is managed by Kafka Streams for you and it's fault-tolerant. This way, after an error the state will be recovered and you don't loose any data (even if the input messages are not reprocessed).

I doubt that setting the commit interval to 120 seconds actually works as expected for all cases, because there is no alignment between when a commit happens and when punctuation is called.

-1
votes

Some of this will depend on the client you are using and whether it's based on librdkafka. Some of the answer will also depend on how you are "looping" over the "poll" method. A typical example will look like the code under "Automatic Offset Committing" at https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html But this assumes quite a rapid poll loop (100ms + processing time) and a auto.commit.timeout.ms at 1000ms (the default is usually 5000ms).

If I read your question correctly, you seem to consuming messages once per 60 seconds?

Something to be aware of is that the behavior of kafka client is quite tied to how frequently poll is called (some libraries will wrap poll inside something like a "Consume" method). Calling poll frequently is important in order to appear "alive" to the broker. You will get other exceptions if you do not poll at least every max.poll.interval.ms (default 5min). It can lead to clients being kicked out of their consumer groups.

anyway, to the point... auto.commit.interval.ms is just a maximum. If a message has been accepted/acknowledged or StoreOffset has been used, then, on poll, the client can decide to update the offset on the broker. Maybe due to client side buffer size being hit or some other semantic.

Another thing to look at (esp if using a librdkafka based client. others have something similar) is enable.auto.offset.store (default true) this will "Automatically store offset of last message provided to application" so every time you poll/consume a message from the client it will StoreOffset. If you also use auto.commit then your offset may move in ways you might not expect.

See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for the full set of config for librdkafka.

There are many/many ways of consuming/acknowledging. I think for your case, the comment for max.poll.interval.ms on the config page might be relevant.

" Note: It is recommended to set enable.auto.offset.store=false for long-time processing applications and then explicitly store offsets (using offsets_store()) after message processing "

Sorry that this "answer" is a bit long winded. I hope there are some threads for you to pull on.