2
votes

I was going through this article which explains how to ensure message is processed exactly once by doing following:

  • Read (topic, partition, offset) from database on start/restart
  • Read message from specific (topic, partition, offset)
  • Atomically do following things (say for example in same database transaction):
    • Processing message
    • Commit offset to database as (topic, partition, offset)
    • Manually commit offset to Kafka by calling consumer.commitAsync() or consumer.commitSync()

My doubt is what is effect of setting different values to different consumer properties:

  1. enable.auto.commit
    How should I set this property? true or false? Article says we should set it to false. But what wrong can go if I set it to true? In this, I am saving offset to external database. So after crash, when consumer comes online, it will start consuming from offset saved in database. So, I feel, value of this property has no effect on start/restart.
    Also I dont feel there will be any effect of different values of this property within single consumer run, as offset is used to read next message and whether we commit it manually or automatically has no effect (it will still be same offset).

  2. auto.offset.reset
    There are two main values of this property latest and earliest. If set to latest, it will make consumer read messages put afterwards, that is after starting consumer. If set to earliest, it will make consumer read from first unread message. Since this both affect from where consumer should start reading message when started, I feel this property will also not have any effect on atomic consumer specified in the article. This is because, in this implementation, newly started consumer starts reading messages from the offset specified in the database.

Am I correct with both above thoughts?

2
what do you mean by saving the offset to external database ?Divyanshu Jimmy
To "external database". If you read the linked article, it gives example of storing offset in MySQL.Mahesha999
For the case of enable.auto.commit , if you set it to true then consider the case when consumer is still doing heavy processing of message but it already started reading next offsets. If your consumer failed during this case then the offset read will be saved in database. When system comes up again you wont have the actual offset of message which failed during processing hence you lost the messages.Divyanshu Jimmy

2 Answers

0
votes
  1. enable.auto.commit
    When consumer restarts after crash, it will start consuming topic-partition from offset fetched from the database. Value of this property will serve in single no-crash run of consumer as in case of any other scenario.

    Auto commit makes consumer commit after every 5 seconds (default value of auto.commit.interval.ms), or at each poll() calls.

    Manual commit (enable.auto.commit=false) helps in avoiding no-processing of message. For example, if auto commits' 5 second timer expires between reading and processing of message, it may end up doing: (read, commit, process), in that sequence. And if consumer crashes after commit, without processing the message (read,commit,crash), that message will never be processed, because on next poll() consumer will fetch next message (since commit was successful). We can prevent this by doing manual commit in following order: (read, process,commit).

    However, in this, there is a chance that consumer may crash after processing without committing (read,process,crash). This will lead to reprocessing of same message on next poll().

    To avoid this duplicate processing, we are storing offset in external database and fetching it on consumer restart. Note that storing offset to database and fetching it on consumer restart, also avoid no-processing of message in case of (read,commit,crash) sequence which may occur in case of auto commit.

    Thus, in short, manual commit does not serve any purpose when we are storing offset to external database. Hence, we can set enable.auto.commit to anything true or false. However, when set to false, we shouldnt forget to commit explicitly. Otherwise, consumer will keep reading and processing same message.

  2. auto.offset.reset

    Its value will have impact when there is no offset value for given topic-partition in database. This will happen when consumer is started for first time or when database is truncated. In this case we need consumer to start consuming from first message which was not consumed by any consumer in its consumer group. For this we need to set the property to earliest.

0
votes

If you use Kafka Stream it support Exactly-once stream pattern you can refer

Exactly-once stream pattern is simply the ability to execute a read-process-write operation exactly one time. Means you consume one message at a time get process and published to another topic and commit. So commit will be handle by Stream automatically one message a time.

If you are not using specific use case to which require its better to go with Kafka provided commit else you need to handle many failure scenario manually e.g. what happened if got DB connectivity issue..and if handling million or records its required very frequent db access.. Kafka already store committed offset which is better in terms of performance not require any external db connection.

_consumer_offsets stored information on committed offsets for each topic, partition per consumer group. Which using commit to update offset detail on this topic

If you have specific use case where required atomic transaction or you would like to keep offset detail near to your result you can manage external offset storage as mentioned here

enable.auto.commit: In case of using external offset manage you are not using commit scenario so it doesn't matter if enable.auto.commit is enable or disable. You will keep fetching messaged using seek(TopicPartition partition, long offset) method and storing offset externally. In case of restart start fetching latest stored offset.The only impact if some in-built dashboard like Confluent Control-Center, Grafana etc. used to maintain Kafka topic will not reflect in case not manually commit and enable.auto.commit false...

auto.offset.reset Yes it only impact when you start first time but since you are using to fetch message from specific partition and offset no impact.

=======================Updated=======================

enable.auto.commit - If true (default), periodically commit offset of the last message handed to the application. The committed offset will be used when the process restarts to pick up where it left off.

auto.commit.interval.ms - The frequency in milliseconds that the consumer offsets are committed (written) to offset storage.

Note: if enable.auto.commit false will not use auto.commit.interval.ms

In case of enable.auto.commit true commit method will get call on every poll and if auto.commit.interval.ms get passed offset will get committed

  1. poll interval> commit interval : Offset commit at poll interval time
  2. poll interval< commit interval : Commit method will get call on each poll but Offset will get committed on consecutive poll() after commit interval passed.