1
votes

Our topology uses KafkaSpout to fetch messages from kafka topics. We have ~150 topics with ~12 partitions, 8 storm executors and tasks on 2 storm nodes. Storm version 1.0.5, Kafka brokers version 10.0.2, Kafka clients version 0.9.0.1. We do not delete Kafka topics.

At some moment in time i observed huge amount of repetitive WARN messages in worker.log

2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition{host1:9092, topic=topic_1, partition=10} Got fetch request with offset out of range: [9248]

2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition{host=host2:9092, topic=topic_2, partition=0} Got fetch request with offset out of range: [22650006]

2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition{host=host3:9092, topic=topic_3, partition=4} Got fetch request with offset out of range: [1011584]

2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] Partition{host1:9092, topic=topic4, partition=4} Got fetch request with offset out of range: [9266]

2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] Partition{host=host2:9092, topic=topic5, partition=4} Got fetch request with offset out of range: [9266]

2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition{host1:9092, topic=topic6, partition=4} Got fetch request with offset out of range: [1011584]

2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition{host=host2:9092, topic=topic6, partition=10} Got fetch request with offset out of range: [9248]

For some reason the same constant offset value was used for the same partition of different topics.

I enabled DEBUG mode and observed log files more precisely.

2018-05-29 14:37:03.573 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition{host=host3:9092, topic=topic1, partition=8} for topology: topology1

2018-05-29 14:37:03.577 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition{host=host1:9092, topic=topic2, partition=8} for topology: topology1

2018-05-29 14:37:03.578 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition{host=host2:9092, topic=topic3, partition=8} for topology: topology1

2018-05-29 14:38:07.581 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition{host=host1:9092, topic=topic4, partition=8} for topology: topology1

2018-05-29 14:38:07.582 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition{host=host2:9092, topic=topic5, partition=8} for topology: topology1

2018-05-29 14:38:07.584 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition{host=host3:9092, topic=topic6, partition=8} for topology: topology1

I noticed that some part of all the topics was split in two independent groups. Each group consisted of 31 topics. All the topics in each group were using the same offset value for each partition. However that value wasn't constant and vary between 8 different values. Each of these 8 values was correct for a particular topics from the group. Moreover each of these values was growing over the time and all the topics updated it synchronously. Most of the topics(55 from 62) from each group had a corresponding 'offset out or range' WARNING message, but with the constant value. Other 7 topics were continuing to work correctly without WARNING messages but their offset value was changing as well.

I went through the source code of storm-kafka and noticed that useStartOffsetTimeIfOffsetOutOfRange flag doesn't work in our case, because we don't have failed tuples and kafka offset is less than _emittedToOffset. So the same WARN message is logged again and again.

    } catch (TopicOffsetOutOfRangeException e) {
        offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
        // fetch failed, so don't update the fetch metrics

        //fix bug [STORM-643] : remove outdated failed offsets
        if (!processingNewTuples) {
            // For the case of EarliestTime it would be better to discard
            // all the failed offsets, that are earlier than actual EarliestTime
            // offset, since they are anyway not there.
            // These calls to broker API will be then saved.
            Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);

            // Omitted messages have not been acked and may be lost
            if (null != omitted) {
                _lostMessageCount.incrBy(omitted.size());
            }

            _pending.headMap(offset).clear();

            LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
        }

        if (offset > _emittedToOffset) {
            _lostMessageCount.incrBy(offset - _emittedToOffset);
            _emittedToOffset = offset;
            LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
        }

        return;
    }

However i don't understand how is it possible that _emittedToOffset got the same value for different topics. Do you probably have any ideas why this could happen?

1
If you create two unique topics with one partition, send one message to each. Both topics are at offset 1... Why can't topics have the same offset value?OneCricketeer
They can, but in our case all the topics have different unique offsets.Nikita Gorbachevski
My point is that is not guaranteed.OneCricketeer
It might be worth seeing which broker is the leader for each topic partition, but what I find interesting is that in your second quote, it's all partition 8OneCricketeer
it is, i checked offsets in kafka using its utilities. It's definitely incorrect behaviour of KafkaSpout.Nikita Gorbachevski

1 Answers

0
votes

There is a bug in storm-kafka source code which occurs when Kafka brokers fail. Here are corresponding JIRA ticket and pull request with fix.