Good analysis. You have a very good point and if possible you should certainly let kafka handle the partition assignment to consumers.
There is an alternative to consumer.Assign(Partition[]). The kafka brokers will notify your consumers when a partition is revoked or assigned to the consumer. For example, the dotnet client library has a 'SetPartitionsRevoked' and 'SetPartitionsAssigned' handler, that consumers can use to manage their offsets.
When a partition is revoked, persist your last processed offset for each partition being revoked to the database. When a new partition is assigned, get the last processed offset for that partition from the database and use that.
C# Example:
public class Program
{
public void Main(string[] args)
{
using (
var consumer = new ConsumerBuilder<string, string>(config)
.SetErrorHandler(ErrorHandler)
.SetPartitionsRevokedHandler(HandlePartitionsRevoked)
.SetPartitionsAssigned(HandlePartitionsAssigned)
.Build()
)
{
while (true)
{
consumer.Consume()//.Poll()
}
}
}
public IEnumerable<TopicPartitionOffset>
HandlePartitionsRevoked
(
IConsumer<string, string> consumer,
List<TopicPartitionOffset> currentTopicPartitionOffsets
)
{
Persist(<last processed offset for each partition in
'currentTopicPartitionOffsets'>);
return tpos;
}
public IEnumerable<TopicPartitionOffset> HandlePartitionsAssigned
(
IConsumer<string, string> consumer,
List<TopicPartition> tps
)
{
List<TopicPartitionOffset> tpos = FetchOffsetsFromDbForTopicPartitions(tps);
return tpos
}
}
Java Example from the ConsumerRebalanceListener Docs:
If writing in Java, there is a 'ConsumerRebalanceListener' interface that you can implement. You then pass your implementation of the interface into the consumer.Subscribe(topic, listener) method. The example below is taken verbatim from the kafka docs linked above:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
private Consumer<?,?> consumer;
public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
this.consumer = consumer;
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// save the offsets in an external store using some custom code not described here
for(TopicPartition partition: partitions)
saveOffsetInExternalStore(consumer.position(partition));
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// read the offsets from an external store using some custom code not described here
for(TopicPartition partition: partitions)
consumer.seek(partition, readOffsetFromExternalStore(partition));
}
}
If my understanding is correct, you would call the java version like this: consumer.Subscribe("My topic", new SaveOffsetsOnRebalance(consumer))
.
For more information, see the 'Storing Offsets Outside Kafka' section of the kafka docs.
Here's an excerpt from those docs that summarizes how to store the partitions and offsets for exactly-once processing:
Each record comes with its own offset, so to manage your own offset
you just need to do the following:
- Configure enable.auto.commit=false
- Use the offset provided with each ConsumerRecord to save your position.
- On restart restore the position of the consumer using seek(TopicPartition, long).
This type of usage is simplest when the partition assignment is also
done manually (this would be likely in the search index use case
described above). If the partition assignment is done automatically
special care is needed to handle the case where partition assignments
change. This can be done by providing a ConsumerRebalanceListener
instance in the call to subscribe(Collection,
ConsumerRebalanceListener) and subscribe(Pattern,
ConsumerRebalanceListener). For example, when partitions are taken
from a consumer the consumer will want to commit its offset for those
partitions by implementing
ConsumerRebalanceListener.onPartitionsRevoked(Collection). When
partitions are assigned to a consumer, the consumer will want to look
up the offset for those new partitions and correctly initialize the
consumer to that position by implementing
ConsumerRebalanceListener.onPartitionsAssigned(Collection).
Another common use for ConsumerRebalanceListener is to flush any
caches the application maintains for partitions that are moved
elsewhere.