0
votes

I'm using Kafka 2.3.1 (server) with spring-kafka 2.2.14 and kafka-clients 2.0.1.

I have a topic with 10 partitions.

Producer

One producer publishes messages to the Kafka cluster, distributing them among partitions according to the key, as the order by key of messages matters.

acks = all

Consumer

Some instances of the same consumer (starting from 2 to 5 with horizontal scaling in k8s), each with only one thread, pull these messages and perform processing.

Message processing time varies unpredictably.

Trying to avoid frequently group rebalancing, I moved message processing to another thread. The consumer continues calling poll while the processor is still working.

auto.offset.reset = earliest
enable.auto.commit = false
max.poll.interval.ms = 300000
max.poll.records = 1
AckMode.MANUAL_IMMEDIATE    
syncCommits = true

Execution example

10 partitions were assigned to consumer-0 (0 to 4) and consumer-1 (5 to 9). New consumers are added during processing, and each addition causes rebalancing. I cannot resume partitions that were paused before.

message-1 was consumed twice. Since processing takes longer than max.poll.interval.ms, rebalancing rearranges partitions. When the commit is triggered, it is possible that the offset partition is associated with another consumer. And this commit takes effect. The rebalancing is probably resuming all paused partitions. The new consumer associated with this partition is able to pull the message again because it is not paused.

consumer-1 [partition-5 offset-12] Received Message [message-1]
consumer-1 [partition-5 offset-12] Paused Partitions [partition-5, partition-8, partition-9, partition-6, partition-7]
consumer-2 Started
consumer-2 [partition-3 offset-80] Received Message [message-2]
consumer-2 [partition-3 offset-80] Paused Partitions [partition-2, partition-3, partition-0, partition-1]
consumer-3 Started
consumer-3 [partition-7 offset-43] Received Message [message-3]
consumer-3 [partition-7 offset-43] Paused Partitions [partition-6, partition-7]
consumer-4 Started
consumer-4 [partition-5 offset-12] Received Message [message-1]
consumer-4 [partition-5 offset-12] Paused Partitions [partition-4, partition-5]
consumer-1 [partition-5 offset-12] Executed Task 
consumer-1 [partition-5 offset-12] Committed [message-1]
consumer-1 [partition-5 offset-12] Resumed Partitions []
consumer-2 [partition-3 offset-80] Executed Task
consumer-2 [partition-3 offset-80] Committed [message-2]
consumer-2 [partition-3 offset-80] Resumed Partitions []
consumer-3 [partition-7 offset-43] Executed Task
consumer-3 [partition-7 offset-43] Committed [message-3]
consumer-3 [partition-7 offset-43] Resumed Partitions []
consumer-4 [partition-5 offset-12] Executed Task
consumer-4 [partition-5 offset-12] Committed [message-1]
consumer-4 [partition-5 offset-12] Resumed Partitions [partition-4, partition-5]

Consumer implementation in Groovy

    @KafkaListener(topics = '${topic.files}', 
                   containerFactory = "fileKafkaListenerContainerFactory")
    void receive(@Payload FileEvent event,
                 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                 @Header(KafkaHeaders.OFFSET) Long offset,
                 Acknowledgment ack,
                 Consumer consumer) {
        LOGGER.info("[partition-{} offset-{}] Received Message [{}]", partition, offset, event)
        try {
            LOGGER.debug("[partition-{} offset-{}] Paused Partitions [{}]", partition, offset, consumer.assignment())
            consumer.pause(consumer.assignment())
            ListenableFuture<Void> future = applicationTaskExecutor.submitListenable(callable(event))
            future.addCallback({ }, { ex -> throw ex }  )
            while (!future.isDone()) {
                Thread.sleep(500)
                consumer.poll(Duration.ofMillis(3_000))
            }
            future.get()
            ack.acknowledge()
            LOGGER.debug("[partition-{} offset-{}] Committed [{}]", partition, offset, event)
        } catch (Exception cause) {
            String message = String.format("Fail to consume partition=%s offset=%s %s", partition, offset, event)
            throw new RuntimeException(message, cause)
        } finally {
            LOGGER.debug("[partition-{} offset-{}] Resumed Partitions [{}]", partition, offset, consumer.paused())
            consumer.resume(consumer.paused())
        }
    }
    
    Callable<Void> callable(FileEvent event) {
        { ->
            indexService.index(event)
        }
    }

How could I properly pause/resume considering the rebalancing that occurs as I add a new consumer for a long running processing?

1

1 Answers

0
votes

Don't call pause() / resume() on the consumer itself; pause the listener container instead (and set max.poll.records to 1).

The container has logic to re-pause after a rebalance.

if (ListenerConsumer.this.consumerPaused) {
    ListenerConsumer.this.consumer.pause(partitions);
    ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
            + "consumer paused again, so the initial poll() will never return any records");
}

If you really want to do all this yourself, set max.poll.records to 1 and use a rebalance listener to re-pause the consumer.