I'm trying to use low-level Consumer Java API to manage offsets manually, with the latest kafka_2.10-0.8.2.1. To verify that the offsets I commit/read from Kafka are correct, I use the kafka.tools.ConsumerOffsetChecker tool.
Here is an example of the output for my topic/consumer group:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
Here is my interpretation of the result:
Offset = 5 --> this is the current offset of my 'elastic_search_group' consumer
logSize = 29 --> this is the Latest offset - the offset of the next message that will come to this topic/partition
Lag = 24 --> 29-5 - how many messages are not yet processed by my 'elastic_search_group' consumer
Pid - partition ID
Q1: is this correct?
Now, I want to get the same information from my Java consumer. Here, I found that I had to use two different APIs:
kafka.javaapi.OffsetRequest to get Earliest and Latest offsets, but kafka.javaapi.OffsetFetchRequest to get the current offset.
To get Earliest (or Latest) offset I do:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
And to get the current offset I have to use a completely different API:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2: is it correct? why are there two different APIs to get a very similar information?
Q3: does it matter which versionId and correlationId I am using here? I though versionId should be 0 for pre-0.8.2.1 kafka, and be 1 for 0.8.2.1 and later - but seems like it works with 0 for 0.8.2.1 as well - see below ?
So, for the the example state of the topic above, and the above output of the ConsumerOffsetChecker, here is what I get from my Java code:
currentOffset=5; earliestOffset=29; latestOffset=29
'currentOffset' seems to be Ok, 'latestOffset' is correct too, but the 'earliestOffset'? I would expect it to be at least '5'?
Q4: How could it happen that the earliestOffset is higher than the currentOffset? My only suspicion is that maybe messages from the topic were cleaned out due to retention policy…. Any other cases this could have happened?