I'm using Kafka 2.3.1 (server) with spring-kafka 2.2.14 and kafka-clients 2.0.1.
I have a topic with 10 partitions.
Producer
One producer publishes messages to the Kafka cluster, distributing them among partitions according to the key, as the order by key of messages matters.
acks = all
Consumer
Some instances of the same consumer (starting from 2 to 5 with horizontal scaling in k8s), each with only one thread, pull these messages and perform processing.
Message processing time varies unpredictably.
Trying to avoid frequently group rebalancing, I moved message processing to another thread. The consumer continues calling poll while the processor is still working.
auto.offset.reset = earliest
enable.auto.commit = false
max.poll.interval.ms = 300000
max.poll.records = 1
AckMode.MANUAL_IMMEDIATE
syncCommits = true
Execution example
10 partitions were assigned to consumer-0 (0 to 4) and consumer-1 (5 to 9). New consumers are added during processing, and each addition causes rebalancing. I cannot resume partitions that were paused before.
message-1 was consumed twice. Since processing takes longer than max.poll.interval.ms
, rebalancing rearranges partitions. When the commit is triggered, it is possible that the offset partition is associated with another consumer. And this commit takes effect. The rebalancing is probably resuming all paused partitions. The new consumer associated with this partition is able to pull the message again because it is not paused.
consumer-1 [partition-5 offset-12] Received Message [message-1]
consumer-1 [partition-5 offset-12] Paused Partitions [partition-5, partition-8, partition-9, partition-6, partition-7]
consumer-2 Started
consumer-2 [partition-3 offset-80] Received Message [message-2]
consumer-2 [partition-3 offset-80] Paused Partitions [partition-2, partition-3, partition-0, partition-1]
consumer-3 Started
consumer-3 [partition-7 offset-43] Received Message [message-3]
consumer-3 [partition-7 offset-43] Paused Partitions [partition-6, partition-7]
consumer-4 Started
consumer-4 [partition-5 offset-12] Received Message [message-1]
consumer-4 [partition-5 offset-12] Paused Partitions [partition-4, partition-5]
consumer-1 [partition-5 offset-12] Executed Task
consumer-1 [partition-5 offset-12] Committed [message-1]
consumer-1 [partition-5 offset-12] Resumed Partitions []
consumer-2 [partition-3 offset-80] Executed Task
consumer-2 [partition-3 offset-80] Committed [message-2]
consumer-2 [partition-3 offset-80] Resumed Partitions []
consumer-3 [partition-7 offset-43] Executed Task
consumer-3 [partition-7 offset-43] Committed [message-3]
consumer-3 [partition-7 offset-43] Resumed Partitions []
consumer-4 [partition-5 offset-12] Executed Task
consumer-4 [partition-5 offset-12] Committed [message-1]
consumer-4 [partition-5 offset-12] Resumed Partitions [partition-4, partition-5]
Consumer implementation in Groovy
@KafkaListener(topics = '${topic.files}',
containerFactory = "fileKafkaListenerContainerFactory")
void receive(@Payload FileEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) Long offset,
Acknowledgment ack,
Consumer consumer) {
LOGGER.info("[partition-{} offset-{}] Received Message [{}]", partition, offset, event)
try {
LOGGER.debug("[partition-{} offset-{}] Paused Partitions [{}]", partition, offset, consumer.assignment())
consumer.pause(consumer.assignment())
ListenableFuture<Void> future = applicationTaskExecutor.submitListenable(callable(event))
future.addCallback({ }, { ex -> throw ex } )
while (!future.isDone()) {
Thread.sleep(500)
consumer.poll(Duration.ofMillis(3_000))
}
future.get()
ack.acknowledge()
LOGGER.debug("[partition-{} offset-{}] Committed [{}]", partition, offset, event)
} catch (Exception cause) {
String message = String.format("Fail to consume partition=%s offset=%s %s", partition, offset, event)
throw new RuntimeException(message, cause)
} finally {
LOGGER.debug("[partition-{} offset-{}] Resumed Partitions [{}]", partition, offset, consumer.paused())
consumer.resume(consumer.paused())
}
}
Callable<Void> callable(FileEvent event) {
{ ->
indexService.index(event)
}
}
How could I properly pause/resume considering the rebalancing that occurs as I add a new consumer for a long running processing?