0
votes

I have multiple Kafka consumers and producers, with different topics. With an Independent application I want to monitor the lag of kafka consumer.

I am using kafka 0.10.0.1, As Kafka now stores the consumer offsets in kafka itself, So how can I read the same.

I am able to read the Topic offset for each partition.

1
You could use kafka-consumer-groups.sh script which can show information about the consumer groupsamethystic
I am using Java, is not there some helper library or some way to get it?Nitesh
Try to invoke AdminClient.createSimplePlaintext("localhost:9092").listGroupOffsetsamethystic
@amethystic listGroupOffsets() is not present in the package. Can you share some link or complete sample code?Nitesh
@Nitesh, I have it present in the 0.10.2.1 version.Dani

1 Answers

0
votes

You could write code like this to fetch the offsets for a group:

public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {  
    Socket socket = connect(host, port);
    try {
        return send(request, apiKey, socket);
    } finally {
        socket.close();
    }
}

private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
    sendRequest(socket, request);
    return getResponse(socket);
}

private void sendRequest(Socket socket, byte[] request) throws IOException {
    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
    dos.writeInt(request.length);
    dos.write(request);
    dos.flush();
}

private byte[] getResponse(Socket socket) throws IOException {
     DataInputStream dis = null;
     try {
         dis = new DataInputStream(socket.getInputStream());
         byte[] response = new byte[dis.readInt()];
         dis.readFully(response);
         return response;
    } finally {
         if (dis != null) {
             dis.close();
         }
     }
}

private Socket connect(String hostName, int port) throws IOException {
    return new Socket(hostName, port);
}

private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
    RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
    ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
    header.writeTo(buffer);
    request.writeTo(buffer);
    byte[] serializedRequest = buffer.array();
    byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
    ByteBuffer responseBuffer = ByteBuffer.wrap(response);
    ResponseHeader.parse(responseBuffer);
    return responseBuffer;
}    

// Get offsets of a given topic for a group
public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
    TopicPartition tp = new TopicPartition(topic, parititon);
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp)).setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
    System.out.println(partitionData.offset);
} 

// Get offsets of all topics for a group
public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    return resp.responseData();
}