3
votes

I have python script and I need to retrieve the current consumer group offset for a set of consumers reading from a kafka topic, using a kafka1 broker cluster. These are native kafka consumers that store the offset in the kafka cluster, not in zookeeper.

The script itself does not need to consume any messages, just read the current offset for other consumers. I realise it is possible to do this with kafka-consumer-groups.sh but ideally I want to avoid relying on shell commands.

I can already do this utilising the dpkp/kafka-python client, but only by creating a consumer and assigning it to the group, which then affects the existing consumers using that group by de-allocating some partitions. I need the script to be entirely passive, not performing any action that will interrupt the other consumers.

2

2 Answers

3
votes

linkedin/kafka-tools has a function get_offsets_for_group() for getting group offsets. It can be passed a group name and topic name, or just a group name to retrieve committed offsets for all topics for that group.

from kafka.tools.client import Client

group='mygroup'

client=Client(broker_list='localhost:9029')
client.connect()

offsets=client.get_offsets_for_group(group)

for topic in offsets:
  for partition_offset in offsets[topic].partitions:
    print("group: {0} - topic: {1} - partition: {2}".format(group,topic,partition_offset))
1
votes

Using dpkp/kafka-python, you can retrieve committed offsets for a specific group by sending an OffsetFetchRequest. If you use OffsetFetchRequest_v3, you can pass None for the topics argument to get offsets for all topics/partitions the group has stored offsets for.

For example:

from kafka import BrokerConnection
from kafka.protocol.commit import *
import socket

group = 'mygroup'

bc = BrokerConnection('localhost', 9092, socket.AF_INET)
bc.connect_blocking()

fetch_offset_request = OffsetFetchRequest_v3(group, None)

future = bc.send(fetch_offset_request)
while not future.is_done:
    for resp, f in bc.recv():
        f.success(resp)

for topic in future.value.topics:
    print('offsets for {0}'.format(topic[0]))
    for partition in topic[1]:
        print('- partition {0}, offset: {1}'.format(partition[0], partition[1]))

If mygroup has committed offset for topic and topic2, it will print something like:

offsets for topic2
- partition 0, offset: 10
- partition 1, offset: 10
- partition 2, offset: 10
offsets for topic
- partition 0, offset: 3