12
votes

We are facing a problem with Kafka. We have a topic with only a partition and only one consumer in a consumer group. The consumer has been stopped for a month. In the meantime, producers are sending messages to the topic.

When we start the consumer again, it is not able to consume messages. I assume that the previously committed offset has been lost, so the consumer has no idea to find the starting point when awaken.

When we stop and start the consumer again, then the consumer can pick up the new messages, but all message that has been sent previously never got consumed.

Has offset been corrupted? Does the retention period for the kafka internal topic offsets, mean that the last committed offsets been has removed?

3
hi @matthias-j-sax , presuming we have facing problem with offset management, what is the best practice to maintain offset, is this a good idea to have offset maintained outside of kafka cluster, such as RDMS, so in the event of losing offset, we can restart consumer from the offset which stored in DB?Joey Trang
Simplest thing is to change broker setting an increase offset retention time. If you set it to INT_MAX you get multiple years... That should do. If you really need longer retention time, you can store offset anywhere you want like an DB, too. But it's of course, more code to maintain in you own code base.Matthias J. Sax
I am abit confused as even I have set offset.retention.minutes to 10 mins, I was expecting offsets of some particular groups will be removed because of no active consumer group at present, but when I monitor log there is no offset expired [2017-03-02 06:57:24,907] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager) @MatthiasJ.SaxJoey Trang
Not sure... how long did you wait? retention.time is a guaranteed lower bound, and there is no strict enforcement to delete the offsets immediately -- there is also a clenup interval that you can configure.Matthias J. Sax

3 Answers

10
votes

The retention value can be configured in kafka broker using:

offsets.retention.minutes

The default is 24 hours. See: Kafka consumer group offset retention

Edit: As of Kafka 2.0, the default value is 7 days.

3
votes

we were also facing the same issue and I have researched a bit about it. I have the changed below steps it resolved .

We have retention of 14 days so we changed the kafka offsets topic retention to 14 days

We have changed cleanup.policy from compact to delete by running below

./kafka-configs.sh  --alter --zookeeper localhost:2181 --entity-type topics
              --entity-name __consumer_offsets --add-config cleanup.policy=delete

Updated config for topic: "__consumer_offsets".

1
votes

The default retention period for the message(offset) in kafka is a week i.e)7 days

After the retention period all the existing offsets will be deleted from the broker due to avoid overflow of the space.

so the previous committed offset will be changed to latest offset as the previous offset has been removed by kafka / zookeeper.

If you want to have your message for long period, Specify --retention-period value while creating topic in kafka.