3
votes

I was going through this article which explains how to ensure message is processed exactly once by doing following:

  • Read (topic, partition, offset) from database on start/restart
  • Read message from specific (topic, partition, offset)
  • Atomically do following things:
    • Processing message
    • Commit offset to database as (topic, partition, offset)

As you can see, it explicitly specified from which partition to read messages. I feel its not good idea as it does not let allow Kafka to assign fair share of partition to active consumers. I am not able to come with logic to implement similar functionality without explicitly specifying partitions while polling kafka topic inside consumer. Is it possible to do?

1

1 Answers

2
votes

Good analysis. You have a very good point and if possible you should certainly let kafka handle the partition assignment to consumers.

There is an alternative to consumer.Assign(Partition[]). The kafka brokers will notify your consumers when a partition is revoked or assigned to the consumer. For example, the dotnet client library has a 'SetPartitionsRevoked' and 'SetPartitionsAssigned' handler, that consumers can use to manage their offsets.

When a partition is revoked, persist your last processed offset for each partition being revoked to the database. When a new partition is assigned, get the last processed offset for that partition from the database and use that.

C# Example:

public class Program
{
   public void Main(string[] args)
   {

      using (
         var consumer = new ConsumerBuilder<string, string>(config)
                      .SetErrorHandler(ErrorHandler)
                      .SetPartitionsRevokedHandler(HandlePartitionsRevoked)
                      .SetPartitionsAssigned(HandlePartitionsAssigned)
                      .Build()
      )
      {
         while (true)
         {
            consumer.Consume()//.Poll()
         }
      }
   }

   public IEnumerable<TopicPartitionOffset> 
   HandlePartitionsRevoked
   (
      IConsumer<string, string> consumer, 
      List<TopicPartitionOffset> currentTopicPartitionOffsets
   )
   {
      Persist(<last processed offset for each partition in 
      'currentTopicPartitionOffsets'>);
      return tpos;
   }

   public IEnumerable<TopicPartitionOffset> HandlePartitionsAssigned
   (
      IConsumer<string, string> consumer, 
      List<TopicPartition> tps
    )
   {
      List<TopicPartitionOffset> tpos = FetchOffsetsFromDbForTopicPartitions(tps);
      return tpos
   }
}

Java Example from the ConsumerRebalanceListener Docs:

If writing in Java, there is a 'ConsumerRebalanceListener' interface that you can implement. You then pass your implementation of the interface into the consumer.Subscribe(topic, listener) method. The example below is taken verbatim from the kafka docs linked above:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
       private Consumer<?,?> consumer;

       public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
           this.consumer = consumer;
       }

       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
           // save the offsets in an external store using some custom code not described here
           for(TopicPartition partition: partitions)
              saveOffsetInExternalStore(consumer.position(partition));
       }

       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
           // read the offsets from an external store using some custom code not described here
           for(TopicPartition partition: partitions)
              consumer.seek(partition, readOffsetFromExternalStore(partition));
    }
}

If my understanding is correct, you would call the java version like this: consumer.Subscribe("My topic", new SaveOffsetsOnRebalance(consumer)).

For more information, see the 'Storing Offsets Outside Kafka' section of the kafka docs.

Here's an excerpt from those docs that summarizes how to store the partitions and offsets for exactly-once processing:

Each record comes with its own offset, so to manage your own offset you just need to do the following:

  • Configure enable.auto.commit=false
  • Use the offset provided with each ConsumerRecord to save your position.
  • On restart restore the position of the consumer using seek(TopicPartition, long).

This type of usage is simplest when the partition assignment is also done manually (this would be likely in the search index use case described above). If the partition assignment is done automatically special care is needed to handle the case where partition assignments change. This can be done by providing a ConsumerRebalanceListener instance in the call to subscribe(Collection, ConsumerRebalanceListener) and subscribe(Pattern, ConsumerRebalanceListener). For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by implementing ConsumerRebalanceListener.onPartitionsRevoked(Collection). When partitions are assigned to a consumer, the consumer will want to look up the offset for those new partitions and correctly initialize the consumer to that position by implementing ConsumerRebalanceListener.onPartitionsAssigned(Collection).

Another common use for ConsumerRebalanceListener is to flush any caches the application maintains for partitions that are moved elsewhere.