Our topology uses KafkaSpout
to fetch messages from kafka topics. We have ~150 topics with ~12 partitions, 8 storm executors and tasks on 2 storm nodes.
Storm version 1.0.5, Kafka brokers version 10.0.2, Kafka clients version 0.9.0.1. We do not delete Kafka topics.
At some moment in time i observed huge amount of repetitive WARN messages in worker.log
2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition{host1:9092, topic=topic_1, partition=10} Got fetch request with offset out of range: [9248]
2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition{host=host2:9092, topic=topic_2, partition=0} Got fetch request with offset out of range: [22650006]
2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition{host=host3:9092, topic=topic_3, partition=4} Got fetch request with offset out of range: [1011584]
2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] Partition{host1:9092, topic=topic4, partition=4} Got fetch request with offset out of range: [9266]
2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] Partition{host=host2:9092, topic=topic5, partition=4} Got fetch request with offset out of range: [9266]
2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition{host1:9092, topic=topic6, partition=4} Got fetch request with offset out of range: [1011584]
2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition{host=host2:9092, topic=topic6, partition=10} Got fetch request with offset out of range: [9248]
For some reason the same constant offset value was used for the same partition of different topics.
I enabled DEBUG mode and observed log files more precisely.
2018-05-29 14:37:03.573 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition{host=host3:9092, topic=topic1, partition=8} for topology: topology1
2018-05-29 14:37:03.577 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition{host=host1:9092, topic=topic2, partition=8} for topology: topology1
2018-05-29 14:37:03.578 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition{host=host2:9092, topic=topic3, partition=8} for topology: topology1
2018-05-29 14:38:07.581 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition{host=host1:9092, topic=topic4, partition=8} for topology: topology1
2018-05-29 14:38:07.582 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition{host=host2:9092, topic=topic5, partition=8} for topology: topology1
2018-05-29 14:38:07.584 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition{host=host3:9092, topic=topic6, partition=8} for topology: topology1
I noticed that some part of all the topics was split in two independent groups. Each group consisted of 31 topics. All the topics in each group were using the same offset value for each partition. However that value wasn't constant and vary between 8 different values. Each of these 8 values was correct for a particular topics from the group. Moreover each of these values was growing over the time and all the topics updated it synchronously. Most of the topics(55 from 62) from each group had a corresponding 'offset out or range' WARNING message, but with the constant value. Other 7 topics were continuing to work correctly without WARNING messages but their offset value was changing as well.
I went through the source code of storm-kafka
and noticed that useStartOffsetTimeIfOffsetOutOfRange
flag doesn't work in our case, because we don't have failed tuples and kafka offset is less than _emittedToOffset
. So the same WARN message is logged again and again.
} catch (TopicOffsetOutOfRangeException e) {
offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
// fetch failed, so don't update the fetch metrics
//fix bug [STORM-643] : remove outdated failed offsets
if (!processingNewTuples) {
// For the case of EarliestTime it would be better to discard
// all the failed offsets, that are earlier than actual EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
// Omitted messages have not been acked and may be lost
if (null != omitted) {
_lostMessageCount.incrBy(omitted.size());
}
_pending.headMap(offset).clear();
LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
}
if (offset > _emittedToOffset) {
_lostMessageCount.incrBy(offset - _emittedToOffset);
_emittedToOffset = offset;
LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
}
return;
}
However i don't understand how is it possible that _emittedToOffset
got the same value
for different topics. Do you probably have any ideas why this could happen?
KafkaSpout
. – Nikita Gorbachevski