38
votes

I'm writing a kafka consumer using Java. I want to keep the real time of the message, so if there are too many messages waiting for consuming, such as 1000 or more, I should abandon the unconsumed messages and start consuming from the last offset.

For this problem, I try to compare the last committed offset and the end offset of a topic(only 1 partition), if the difference between these two offsets is larger than a certain amount, I will set the last committed offset of the topic as next offset so that I can abandon those redundant messages.

Now my problem is how to get the end offset of a topic, some people say I can use old consumer, but it's too complicated, do new consumer has this function?

6

6 Answers

36
votes

The new consumer is also complicated.

//assign the topic consumer.assign();

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();

27
votes

You can also use the kafka server command line tools:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic-name

Output is of the form <topicName>:<partitionID>:<offset>, e.g. t1:0:0, see https://jaceklaskowski.gitbooks.io/apache-kafka/kafka-tools-GetOffsetShell.html for further details.

14
votes

For Kafka version : 0.10.1.1

// Get the diff of current position and latest offset
Set<TopicPartition> partitions = new HashSet<TopicPartition>();
TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition());
partitions.add(actualTopicPartition);
Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition);
long actualPosition = consumer.position(actualTopicPartition);          
System.out.println(String.format("diff: %s   (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition));  
5
votes

I have developed below code to fetch Offset Status

import java.util
import java.util.{Collections, Properties}

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._

class GetOffsetRange(consumer:KafkaConsumer[String,String]) {

  def getStartOffsetRange(topic:String):util.HashMap[TopicPartition,Long]={

    val topicPartitionList = consumer.partitionsFor(topic)
    val partitionMap=new util.HashMap[TopicPartition,Long]()
    val arrTopic=new util.ArrayList[TopicPartition]()

    consumer.subscribe(Collections.singletonList(topic));

    for(topic<-topicPartitionList.asScala){
      println(topic.topic() +","+topic.partition())
      arrTopic.add(new TopicPartition(topic.topic(),topic.partition()))
    }

    consumer.poll(0)

    consumer.seekToBeginning(arrTopic)

    for(partition <- arrTopic.asScala){
      partitionMap.put(partition,consumer.position(partition)-1)
    }
    return partitionMap
  }

  def getEndOffsetRange(topic:String):util.HashMap[TopicPartition,Long]={

    val topicPartitionList = consumer.partitionsFor(topic)
    val partitionMap=new util.HashMap[TopicPartition,Long]()
    val arrTopic=new util.ArrayList[TopicPartition]()

    consumer.subscribe(Collections.singletonList(topic));

    for(topic<-topicPartitionList.asScala){
      println(topic.topic() +","+topic.partition())
      arrTopic.add(new TopicPartition(topic.topic(),topic.partition()))
    }

    consumer.poll(0)

    consumer.seekToEnd(arrTopic)

    for(partition <- arrTopic.asScala){
      partitionMap.put(partition,consumer.position(partition)-1)
    }
    return partitionMap
  }
}
4
votes

since kafka 1.0.1, consumer has a method called endOffsets

public java.util.Map endOffsets(java.util.Collection partitions)

Please let me know if you need full code..

Please refer apache-kafka-1.0.1-javadoc

3
votes
KafkaConsumer<String, String> consumer = ...
consumer.subscribe(Collections.singletonList(topic));
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.poll(0);
consumer.seekToEnd(Collections.singletonList(topicPartition));
long currentOffset = consumer.position(topicPartition) -1;

Above snippet returns the current committed message offset for the given topic and partition number.