0
votes

I'm using Kafka 0.9 new Consumer API.

I'm letting Kafka take care of the offsets for the consumers. I have consumers running on multiple machines reading from the same topic.

I'm trying to find out the following:

  • The consumers registered with a consumer-group
  • The offset of each consumer

I thought the consumer-group - consumer relationship would be stored in ZooKeeper. I see the consumers node in ZooKeeper it has no childeren.

The offsets as far as I can tell by looking through the code are being written into kafka, but I can't tell what topic they're being written to?

3

3 Answers

2
votes

There seems to be at least 2 types of key-value pairs stored in the __consumer_offsets topic @nautilus pointed out.

  1. Group metadata information
  2. Offset commits

Kafka uses its own schema and serialization as far as I can tell. You can find out more about these structures by looking through kafka.coordinator.GroupMetadataManager:

  • GroupMetadataManager.OFFSET_COMMIT_KEY_SCHEMA
  • GroupMetadataManager.OFFSET_COMMIT_VALUE_SCHEMA_V0
  • GroupMetadataManager.GROUP_METADATA_KEY_SCHEMA
  • GroupMetadataManager.GROUP_METADATA_VALUE_SCHEMA_V0
1
votes

As @hba mentions, the details of encoding/decoding are found in kafka.coordinator.GroupMetadataManager near the bottom. Look for readMessageKey and the two following methods. Basically, what you need is a sequence of calls like

import org.apache.kafka.common.protocol.types.Type;
...
ByteBuffer bb = ByteBuffer.wrap(consumerRecord.key())
short version = bb.getShort();
String group = (String)Type.String.read(bb);
String topic = (String)Type.String.read(bb);
int partition = (int)Type.INT32.read(bb);

The nice thing is that org.apache.kafka.common.protocol.types.Type is part of the Java api, independent of big main Jar. The ugly part is, that the above code snippet is not complete. There are two versions of each consumerRecord.key() and consumerRecord.value() that come along and one has to mimic the decoding of the above mentioned methods. Not a big deal, just a bit tedious.

If it is OK for your project to depend on the scala-jar, the full kafka-jar and one or two more jars needed by Kafka, you may as well use GroupMetadataManager.readMessageKey(bb) and the other two methods to read key and value. At least in 0.9.0.1 its public.

0
votes

The offset if its handled by kafka is not stored in zookeeper it is stored in a topic call "__consumer_offsets-#" in the kafka-logs folder.

You can find out the offset of each consumer by checking the offset field in the KafkaRecords when you do poll(), if you want more information about the consumer group check bin/kafka-consumer-groups.sh

Hope it Helps!