2
votes

I have been consuming events from kafka topic and processing the same in my application for quite sometime. The topic has 20 partitions and I set kafka concurrency as 10 , since I consume events from kafka topic with the 2 replica of my application. I set the commit mode as manual immediate , Hence I want to commit the offset of partition once application ensures the events are processed. Everything was fine, until a day when one or multiple nodes of kafka servers were down and restarted. we use 3 nodes of kafka broker. One this happens, I was happened to see that there were much of rebalancing happened in the consumer group it was kicked out and joined back continuously for some time. Then suddenly, I started noticing that each consumer ( group of 10 threads in each replica of my application) printed the log as follows

"found no committed offset for partition"

"Resetting offset to {NUMBER} for the partition {TOPIC-NUMBER}"

After this log, each consumer starts reading the earliest available ( its all committed by application a few days back) offsets from each partitions. is this normal behavior?. I tried to investigate this issue. All I found is , the following

  1. It is KAFKA broker/server which stores the offset information each topic, partition and GroupNamae combination.
  2. Since I use manual commit call acknowledge( I am using spring boot + kafka tech stack), the offsets are increased once i do a successful commit on the specific offset. So the new offset will be for that particular consumer will be committedOffset+1.
  3. Say for instance if 1 start consuming from offset 0 at specific partition of topic and after 10 days of running the application, it would have reached 10K ( for calculation purpose), Until, the kafka brokers restarted/killed, it was reading the series of 10XXX. Once , it was restarted , I stared consuming events from kafka which is 10 days old. Is this possible? . Can all this committed offsets be considered as uncommitted offsets?. if is not committed, then why did I start reading the events after 10 days?. how the offset was moved when I used manual commit? I did not have a clue
  4. from Kafka consumer API ( 2.5.2) , what I understood is when server responds with the offset as -1 for particular partition , the consumer resets the offset using the reset strategy as configured ( earliest in my case). It is clear that I received as -1 during the kafka server restart. But what could be the possibility is something I dont know.

my kafka client properties as follows

ofset.reset= earliest ( but it should not provide me committed offsets again, I guess)
heat beat interval = 2000 ms
session time out= 50000 ms
auto commit = false

I noticed similar issue was asked here earlier . But nothing concludes the issue. I mention those links here

Kafka-node suddenly consumes from offset 0
Kafka consumer: starts reading partition from the beginning even thought there's a committed offset
After kafka crashed, the offsets are lost
Kafka partitions and offsets disappeared

1
Restarting the cluster should not modify the __consumer_offsets topic. But if you never committed offsets at all (you should show your code how you're doing this), then the consumer group never existed to begin with it'll perform the offset reset. And depending how large your segment files get, consuming 10 day old data is entirely possibleOneCricketeer
What version is your broker? Before 2.0, the broker used to expire offsets when nothing was consumed for 24 hours, even if the consumer was still connected. In 2.0, it was changed to 7 days. In 2.1 (I think) it was changed to only expire if the consumer is not connected for 7 days. See kafka.apache.org/documentation/#upgrade_200_notable andGary Russell

1 Answers

0
votes

This is not really an answer, more of a suggestion.

Please refer auto.offset.reset. It is stated that when a broker cannot find an offset (for some reason) and the value for auto.offset.reset is set to 'earliest' (which in your case it is), the above behavior would be seen.

Therefore, in your case,

  1. By some means the offsets are not being committed, (I am not that well versed with Spring and I don't see the offsets are managed outside of Kafka), this post perhaps could throw more light (in KafkaConsumer, it is generally done with the committeSync or commitAsync)
  2. The committed offset information is lost (which, granted, would be less likely but possible). The offset information in Kakfa has been shifted from Zookeeper to a commit-log. This post provides some details and also includes a reference to this.

You could check the above two possibilities or at least discount them through testing in your enquiry.