2
votes

We have a requirement that the messages for a given Kafka partition across all the nodes in a cluster that make up a consumer group, should alway execute a single message at a time with no overlap. It is tolerable for them to be process (slightly) out of order, but no temporal overlap is tolerated.

How can we be safe during a rebalance - for example lets say we autoscale our consumers, and start up a new consumer for the same consumer group - then the new consumer will have to take over partitions from the existing consumers in the same consumer group.

For a specific partition P let us say that Consumer c1 was previously processing partition P, and now it has to give up due to the rebalance and another consumer c2 takes over the same partition P.

My question is- how to guarantee the requirement above. So it would not be good if consumer c2 starts processing messages for partition P at the same time as consumer c1 for the same partition P.

I can envision some situations where this is violated - e.g a consumer stops heartbeats to Zookeeper - which triggers the rebalance and yet theconsumer is not really dead - and continues processing - so let's exclude that case - we will have to live with it - not very likely though.

1

1 Answers

1
votes

I think this use-case is very common nowadays. This can be achieved by passing custom implementation of ConsumerRebalanceListener interface while subscribing to particular topic

KafkaConsumer#subscribe(Collection topics, ConsumerRebalanceListener callback)

The ConsumerRebalanceListener.onPartitionsRevoked(Collection<TopicPartition> partitions) is called before a rebalance operation starts and after the consumer stops fetching data. It is recommended that offsets should be committed in this callback to prevent duplicate data.

The ConsumerRebalanceListener.onPartitionsAssigned(Collection<TopicPartition> partitions) method will be called after the partition re-assignment completes and before the consumer starts fetching data. You can load the state/offset for the assigned partition here.