I have one question and more of trying to understand so i can implement it right way. Requirement is to stop the ConcurrentMessageListenerContainer by invoking stop method on container which OOTB iterates for KafkaMessageListenerContainer(based on concurrency defined) and invoke stop for each consumer Thread.
Just FYI, i am on 1.3.5 and i cannot migrate to 2.* due to Spring Boot 1.5.*.
Configuration: Let's say i have topic with 5 partitions and concurrency defined as 5 as well. Using Batchlistener so have batch records count =100 for each poll.
Question: When we invoke stop on container, it appears internally, it set running false for each KafkaMessageListenerContainer and call wakeup on listenerConsumer.
setRunning(false);
this.listenerConsumer.consumer.wakeup();
During testing what i have observed, by invoking stop on container in separate thread, it does below:
1)It stops listenerConsumer and that's for sure working. No more polling happens after calling stop. 2)It seems if any listenerConsumer has already polled 100 records, and in middle of processing it completes the execution before stopping. is #2 per design that invoking container stop only send wakeup to stop next polling? Because i don't see any handling of below in KafkaMessageListenerContainer.run()
catch (WakeupException e) {
//Ignore, we're stopping
}
One more thing, even in spring kafka 2.1 version by having ContainerStoppingBatchErrorHandler, it calls same container stop so i guess it's more of my understanding how to handle this scenario..
To conclude, if above is lot more detail, i want to terminate the listenerThread if stop is been invoked from separate thread. I have manual offset commit so replaying of batch is fine.
Hi Gary,
As you suggested to have consumeraware listener, my question is specific to listener been stopped through container. Once container invokes stop, listener thread be it BatchListner supposed to be interrupted from execution. I know entire poll of records has been received by listener and question is not about loosing offset as ack is at batch level.