0
votes

While implementing manual offset management, I encountered the following issue: (using 0.9)

In order to manage the offsets manually, for each consumed record, I retrieve the current offset of the record and commit the new offset (currentOffset + 1, since the offset reset strategy is "latest").

When a new consumer group is created, it has no explicit offsets (offset is "unknown"), therefore, if it didn't consume messages from all existing partitions before it is stopped, it will have committed offsets for only part of the partitions (the ones the consumer got messages from), while the offset for the rest of the partitions will still be "unknown".

When the consumer is started again, it gets only some of the messages that were produced while it was down (only the ones from the partitions that had a committed offset), the messages from partitions with "unknown" offset are lost and will never be consumed due to the offset reset strategy.

Since it's unacceptable in my case to miss any messages once a consumer group is created, I'd like to explicitly commit an offset for each partition before starting consumption.

To do that I found two options:

  1. Use low level consumer to send an offset request.
  2. Use high level consumer, call consumer.poll(0) (to trigger the assignment), then call consumer.assignment(), and for each TopicPartition call consumer.committed(topicPartition); consumer.seekToEnd(topicPartition); consumer.position(topicPartition) and eventually commit all offsets.

Both are more complex and noisy than I'd expect (I'd expect a simpler API I could use to get the log end position for all partitions assigned to a consumer).

Any thoughts or ideas for a better implementation would be appreciated.

10x.

1
I am not sure, but maybe there is some miss understanding. Why do you not use reset strategy "earliest" -- sounds like this would solve your problem.Matthias J. Sax
This is the requirement. The topic is an existing topic, with lots of old events, which I shouldn't consume. The requirement is to consume all the messages that were produces from the moment the consumer group is created and on.Irit Rolnik
I see. Than approach (2) seems best practice to me. It might be more code as you expected, but I don't see a simpler way to get it done, and using higher lever API is most likely better than using lower level API IMHO.Matthias J. Sax
However, you can simplify the code by using endOffsets() and commit(OffsetsAndMetadata) (instead of seekToEnd(), position(), commit()).Matthias J. Sax
Thanks. endOffsets() is only available in 0.10 (we still use 0.9). I agree, I continued with (2). I found another way to simplify the code though, by adding a ConsumerRebalanceListener, and run this peace of code onPartitionsAssigned. Slightly cleaner.Irit Rolnik

1 Answers

0
votes

Using consumer API totally depends upon where are you committing offsets.

  • If your offsets are getting stored in Kafka broker then definitely you should use high-level consumer API it will provide you with more control over offsets.
  • If you are keeping offsets in zookeeper than you can use any old consumer API like

List< KafkaStream < byte[], byte[] > > streams =consumer.createMessageStreamsByFilter(new Whitelist(topicRegex),1)