4
votes

We have a scenario in our Storm topology where the KafkaSpouts are unable to consume any messages from the topics. The Spout continuously logs the same WARN message:

Got fetch request with offset out of range

...
2016-10-26 11:11:31.070 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.078 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.084 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.098 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.104 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.111 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
...

The Spout is configured to read the last commit offset from zookeeper and that offset in this scenario is greater then the newest message offset in Kafka. We are also looking into why the topics offsets are resetting.

Currently we resolve the issue by observing the out of range warning in the storm logs, delete the zookeeper offset entry and then re-deploy the topology.

2
Did you recently point your spout to a new topic name? Do you have multiple spouts with the same ID?David DeMar
The issue mainly occurs in dev environments so I could see a 'misleading' offset committed to zookeeper. I am hoping to configure the KafkSpout to give-up the endless loop of getting the out of range offset and either go back to the earliest or latest offset.sodwyer

2 Answers

0
votes

In my case, this was happening because I had recreated the Kafka topic which my KafkaSpout was subscribing to.

The offset for a particular partition is saved in Zookeeper and if a topic is deleted and then created again, you will have to delete the offset info manually from Zookeeper.

Just open the Zookeeper CLI and then delete the path on which the 'node' belonging to the consumer 'group-id' of your KafkaSpout exists. For help refer, https://www.tutorialspoint.com/zookeeper/zookeeper_cli.htm

0
votes

If an invalid offset is committed, client configuration "auto.offset.reset" is used. It accepts values "smallest" and "largest". If the value is not set, an exception is thrown (as in your case).

For KafkaSpout you can set this value via variable KafkaConfig#startOffsetTime setting it either to kafka.api.OffsetRequest.EarliestTime() or kafka.api.OffsetRequest.LatestTime().

See http://storm.apache.org/releases/1.0.2/storm-kafka.html