1
votes

I'm using spring kafka 1.2.2.RELEASE. Currently I have configured retry template for container that has no BackOffPolicy and AlwaysRetryPolicy. Ack mode is MANUAL_IMMEDIATE.

When a SIGTERM, I will let the current message to be processed and when @KafkaListener is called again with a new message I throw RuntimeException on container that retries indefinitely and throws exception continuously. And after some time SIGKILL is issued and container is stopped(I think there is a better way to do it). But with this process the message that was was retried is retrieved after the restart by consumer but is committed without calling KafkaListener. See offset=13 in below stack trace

Stack trace:

[20 May 2018 22:37:20] [ INFO] [] [ConsumerCoordinator  onJoinComplete]:[262 ] - Setting newly assigned partitions [messages-0] for group listener
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer$2 onPartitionsAssigned]:[513 ] - Committing on assignment: {messages-0=OffsetAndMetadata{offset=13, metadata=''}}
[20 May 2018 22:37:20] [ INFO] [] [AbstractMessageListenerContainer$2 onPartitionsAssigned]:[278 ] - partitions assigned:[messages-0]
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer        run]:[632 ] - Received: 0 records
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer        run]:[632 ] - Received: 1 records
[20 May 2018 22:37:20] [TRACE] [] [KafkaMessageListenerContainer$ListenerConsumer doInvokeWithRecords]:[931 ] - Processing ConsumerRecord(topic = messages, partition = 0, offset = 13, CreateTime = 1526855737241, serialized key size = 31, serialized value size = 2032, headers = RecordHeaders(headers = [], isReadOnly = false), key = "some key", value = "some random data")
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer ackImmediate]:[749 ] - Committing: {messages-0=OffsetAndMetadata{offset=14, metadata=''}

And Is there a better way to stop container when I see SIGTERM is issued so @KafkaListener is not called with messages. I know that later versions(>2.0.0) of spring-kafka has KafkaListenerEndpointRegistry that can stop all consumers. But it is not currently possible to upgrade to 2.0.0.

Any help is greatly appreciated.

1

1 Answers

0
votes

The endpoint registry has been there since 1.0. 1.x users are recommended to upgrade to the latest 1.3.x; see here.

When stopping the container from a listener it's better to do it on a new thread or the stop will be delayed.

See the 2.1.x ContainerStoppingErrorHandler for how to do that. But, of course, you don't need to throw an exception after stopping.

But with 1.x, you will need to discard any subsequent messages that were already fetched.