3
votes

I have a requirement to read messages from a topic, batch them and push the batch to an external system. If the batch fails for any reason, I need to consume the same set of messages again and repeat the process. So for every batch, the from and to offsets for each partition are stored in a database. In order to achieve this, I am creating one Kafka consumer per partition by assigning partition to the reader, based on the previous offsets stored, the consumers seek to that position and start reading. I have turned off auto commit and I dont commit offsets from the consumer. For every batch, I create a new consumer per partition, read messages from the last offset stored and publish to the external system. Do you see any problems in consuming messages without committing offsets and using the same consumer group across batches, but at any point there won't be more than one consumer per partition ?

2

2 Answers

4
votes

Your design seems reasonable to me.

Committing offsets to Kafka is just a convenient built-in mechanism within Kafka to keep track of offsets. However, there is no requirement whatsoever to use it -- you can use any other mechanism to track offsets, too (like using a DB as in your case).

Furthermore, if you assign partitions manually, there will be no group management anyway. So parameter group.id has no effect. See http://docs.confluent.io/current/clients/consumer.html for more details.

1
votes

In kafka version two i achieved this behaviour without the need for a database to store the offsets. The following is a configuration for spring-boot-kafka but it should also work with any kafka consumer api

spring:
  kafka:
    bootstrap-servers: ...
    consumer:
      value-deserializer: ...
      max-poll-records: 1000
      enable-auto-commit: false
      fetch-min-size: 262144 # 1/4 mb..
      group-id: ...
      fetch-max-wait: 10000 # we will consume every 10s or when 1/4 mb or 1000 records are accumulated.
      auto-offset-reset: earliest
    listener:
      type: batch
      concurrency: 7
      ack-mode: manual

This gives me the messages in batches of max. 1000 records (dependent on load). I then write these records asynchronously to a database and count how many success callbacks i get. If the successful writes equals the received batch size i acknowledge the batch, e.g. i commit the offset. This design was very reliable even in a high-load production environment.