2
votes

enter image description here

I am using consumer from https://github.com/confluentinc/confluent-kafka-go. The kafka version is 0.10.1.0.

Here is the configuration of my consumer:

kafkaClient, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers":               broker,
    "group.id":                       "udwg20",
    "session.timeout.ms":              60000,
    "go.events.channel.enable":        true,
    "go.application.rebalance.enable": true,
    "default.topic.config": kafka.ConfigMap{
        "auto.offset.reset":      "earliest",
        "enable.auto.commit":      true,
        "auto.commit.interval.ms": 10000}})

At the beginning, all current-offset and lag was showed, but after running several hours, the offset and lag of some partitions (which are not received any new message) become unknown. If there is a message come to a partition which its offset and lag are unknown, the offset and lag will be visible again, and the message will be consumed.

When there are some partitions with unknown current-offset and lag, I restart the consumer, at this time, all current partitions with unknown current-offset and lag status will be consumed again from beginning, but other partitions seem to run normally.

I also used a python consumer that consumes message from this topic with a different consumer group id. the python consumer seem to work well without any partition which has unknown current-offset and lag.

2

2 Answers

1
votes

offsets.retention.minutes is used for clean up inactive consumer groups. If a consumer group does not commit any offset for offsets.retention.minutes (defaults to 24h), kafka will clean up its offset. This is why offset and log are set to unknown.

You can increase offset retention period however, be aware that old consumers will reserve space in __consumer_offsets topic.

0
votes

I use the command bellow to see whether the offset of my consumer group id is commited periodically or not.

echo exclude.internal.topics=false > consumer.properties

kafka-console-consumer --consumer.config consumer.properties --from-beginning --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

Although I set the value of enable.auto.commit to true, it does not commit periodically for partitions which its lag = 0. The current offset for those partitions is removed after 2 to 3 hours even the consumer group is still active.

To solve this issue, I set enable.auto.commit to false and write my own function to commit offset after every 5 seconds.

Here is the ideal: When a consumer get a new Message event or reach end of partition (PartitionEOF) event, from the data of the event, I keep the latest current offset in a commit map (key:topic_partition value: kafka.TopicPartition{ Topic, Partition, Offset }) and there is a function to commit this map periodically (may be after every 5 seconds). When the consumer get RevokedPartitions event, I remove the corresponding key topic_partition from the commit map.