0
votes

I have one question and more of trying to understand so i can implement it right way. Requirement is to stop the ConcurrentMessageListenerContainer by invoking stop method on container which OOTB iterates for KafkaMessageListenerContainer(based on concurrency defined) and invoke stop for each consumer Thread.

Just FYI, i am on 1.3.5 and i cannot migrate to 2.* due to Spring Boot 1.5.*.

Configuration: Let's say i have topic with 5 partitions and concurrency defined as 5 as well. Using Batchlistener so have batch records count =100 for each poll.

Question: When we invoke stop on container, it appears internally, it set running false for each KafkaMessageListenerContainer and call wakeup on listenerConsumer.

setRunning(false);
this.listenerConsumer.consumer.wakeup();

During testing what i have observed, by invoking stop on container in separate thread, it does below:

1)It stops listenerConsumer and that's for sure working. No more polling happens after calling stop. 2)It seems if any listenerConsumer has already polled 100 records, and in middle of processing it completes the execution before stopping. is #2 per design that invoking container stop only send wakeup to stop next polling? Because i don't see any handling of below in KafkaMessageListenerContainer.run()

catch (WakeupException e) {
     //Ignore, we're stopping
}

One more thing, even in spring kafka 2.1 version by having ContainerStoppingBatchErrorHandler, it calls same container stop so i guess it's more of my understanding how to handle this scenario..

To conclude, if above is lot more detail, i want to terminate the listenerThread if stop is been invoked from separate thread. I have manual offset commit so replaying of batch is fine.

Hi Gary,

As you suggested to have consumeraware listener, my question is specific to listener been stopped through container. Once container invokes stop, listener thread be it BatchListner supposed to be interrupted from execution. I know entire poll of records has been received by listener and question is not about loosing offset as ack is at batch level.

1

1 Answers

0
votes

It's not clear what your issue is; the container stops when the listener returns after processing the batch.

If you want to manage offsets at the record level, don't use a BatchMessageListener; use a record-level MessageListener instead.

With the former, the whole batch of records returned by the poll is considered a unit of work; offsets for the entire batch are committed, or not.

Even with a record-level listener, the container will not stop until the current batch of records have been sent to the listener; in that case, what you should do depends on the acknowledge mode; with manual acks, simply ignore those records that are received after the stop. If the container manages the acks; throw an exception for the records you want to discard (with ackOnError=false - default is true).

It's unfortunate that you can't move to a more recent release.

With 2.1; there is much more flexibility. For example; the SeekToCurrentErrorHandler and SeekToCurrentBatchErrorHandler are provided.

The SeekToCurrentErrorHandler extends RemainingRecordsErrorHandler which means the remaining records in the poll are sent to the error handler instead of the listener; when the container detects there is a RemainingRecordsErrorHandler the listener won't get the remaining records in the batch and the handler can decide what do do - stop the container, perform seeks etc.

So you no longer need to stop the container when a bad record is received.

This is all designed for handling errors/stopping the container due to some data error.

There currently is no stopNow() option on the containers - immediately stop calling the listener after the current record is processed and discard any remaining records so they will be resent on a start. Currently, you have to discard them yourself as I described above.

We could consider adding such an option but it would be a 2.2 feature at the earliest.

We only provide bug fixes in 1.3.x and, then, only if there is no work around.

It's open source, so you can always fork it yourself, and make contributions back to the framework.