0
votes


I am looking for a way to consume some set of messages from my Kafka topic with specific offset range (assume my partition has offset from 200 - 300, I want to consume the messages from offset 250-270).

I am using below code where I can specify the initial offset, but it would consume all the messages from 250 to till end. Is there any way/attributes available to set the end offset to consume the messages till that point.

    @KafkaListener(id = "KafkaListener",
        topics = "${kafka.topic.name}", 
        containerFactory = "kafkaManualAckListenerContainerFactory", 
        errorHandler = "${kafka.error.handler}",
        topicPartitions = @TopicPartition(topic = "${kafka.topic.name}",
        partitionOffsets = {
                @PartitionOffset(partition = "0", initialOffset = "250"), 
                @PartitionOffset(partition = "1", initialOffset = "250")
        }))
2

2 Answers

0
votes
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

boolean keepOnReading = true;

// offset to read the data from.
long offsetToReadFrom = 250L; 

// seek is mostly used to replay data or fetch a specific message
// seek
kafkaConsumer.seek(partitionToReadFrom, offsetToReadFrom);

while(keepOnReading) {
   ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));

   for (ConsumerRecord<String, String> record : records) {
      numberOfMessagesRead ++;
      logger.info("Key: "+record.key() + ", Value: " + record.value());
      logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());

      if(record.offset() == 270L) {
        keepOnReading = false;
        break;
      }
   }
}

I hope this helps you !!

0
votes

You can use seek() in order to force the consumer to start consuming from a specific offset and then poll() until you reach the target end offset.

public void seek(TopicPartition partition, long offset)

Overrides the fetch offsets that the consumer will use on the next poll(timeout). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets


For example, let's assume you want to start from offset 200:

TopicPartition tp = new TopicPartition("myTopic", 0);
Long startOffset = 200L
Long endOffset = 300L

List<TopicPartition> topics = Arrays.asList(tp);
consumer.assign(topics);
consumer.seek(topicPartition, startOffset);

now you just need to keep poll()ing until endOffset is reached:

boolean run = true;
while (run) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records) {

        // Do whatever you want to do with `record`

        // Check if end offset has been reached
        if (record.offset() == endOffset) {
            run = false;
            break;
        }
    }
}