I'm using spring kafka 1.2.2.RELEASE. Currently I have configured retry template for container that has no BackOffPolicy and AlwaysRetryPolicy. Ack mode is MANUAL_IMMEDIATE.
When a SIGTERM, I will let the current message to be processed and when @KafkaListener is called again with a new message I throw RuntimeException on container that retries indefinitely and throws exception continuously. And after some time SIGKILL is issued and container is stopped(I think there is a better way to do it). But with this process the message that was was retried is retrieved after the restart by consumer but is committed without calling KafkaListener. See offset=13 in below stack trace
Stack trace:
[20 May 2018 22:37:20] [ INFO] [] [ConsumerCoordinator onJoinComplete]:[262 ] - Setting newly assigned partitions [messages-0] for group listener
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer$2 onPartitionsAssigned]:[513 ] - Committing on assignment: {messages-0=OffsetAndMetadata{offset=13, metadata=''}}
[20 May 2018 22:37:20] [ INFO] [] [AbstractMessageListenerContainer$2 onPartitionsAssigned]:[278 ] - partitions assigned:[messages-0]
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer run]:[632 ] - Received: 0 records
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer run]:[632 ] - Received: 1 records
[20 May 2018 22:37:20] [TRACE] [] [KafkaMessageListenerContainer$ListenerConsumer doInvokeWithRecords]:[931 ] - Processing ConsumerRecord(topic = messages, partition = 0, offset = 13, CreateTime = 1526855737241, serialized key size = 31, serialized value size = 2032, headers = RecordHeaders(headers = [], isReadOnly = false), key = "some key", value = "some random data")
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer ackImmediate]:[749 ] - Committing: {messages-0=OffsetAndMetadata{offset=14, metadata=''}
And Is there a better way to stop container when I see SIGTERM is issued so @KafkaListener is not called with messages. I know that later versions(>2.0.0) of spring-kafka has KafkaListenerEndpointRegistry that can stop all consumers. But it is not currently possible to upgrade to 2.0.0.
Any help is greatly appreciated.