1
votes

Is there a way to find whole kafka lag for all consumers assigned to same consumer group?

I could only get the lag for the assigned partition. For e.g. Assume only one partition is assigned to a consumer, below code only brings lag for that partition. Not for other partition.

Set<TopicPartition> partitionSet = consumer.assignment();
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitionSet);
for(TopicPartition tp : partitionSet) {
            LOG.info("Topic:{}, EndOffset:{}, currentOffset:{}, LAG:{}",
                    tp.topic(), endOffsets.get(tp), consumer.position(tp), endOffsets.get(tp)-consumer.position(tp));
        }

Basically, would like to find the sum of lags from all partitions to understand how much all consumers(same group) of a topic is lagging behind.

Also, is there any api available similar to kafka-consumer-groups, and pass bootstrap-server and group as arguments to find the lag?

./kafka-consumer-groups.sh --bootstrap-server --group --describe

1
kafka-consumer-groups is just a wrapper around a Scala class. You can call it from codeOneCricketeer
Alternatively, you can setup external monitoring like Burrow or Prometheus to track all group lagsOneCricketeer

1 Answers

2
votes

The correct way to achieve this programmatically is to use the AdminClient API:

  1. Get committed offsets for the group using listConsumerGroupOffsets().

  2. Get log end offsets. At the moment you need to start a Consumer and call endOffsets() for all partitions retrieved in step 1.

    In Kafka 2.5 (expected end of February 2020), there's a new AdminClient API to retrieve log end offsets listOffsets() that will enable to solely use the AdminClient to retrieve lag.

  3. For each partition, substract the committed offset (step 1) from the log end offset (step 2).

This is basically what kafka-consumer-groups.sh does under the covers. So check the implementation of this tool if you want.