2
votes

I have a kakfa consumer for which enable.auto.commit is set to false. Whenever I re-start my consumer application, it always reads the last committed offset again and then the next offsets.

For ex. Last committed offset is 50. When I restart consumer, it again reads offset 50 first and then the next offsets.

I am performing commitsync as shown below.

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("sometopic", partition), new OffsetAndMetadata(offset));
kafkaconsumer.commitSync(offsets);

I tried setting auto.offset.reset to earliest and latest but it is not changing the behavior.

Am I missing something here in consumer configuration ?

config.put(ConsumerConfig.CLIENT_ID_CONFIG, "CLIENT_ID");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "GROUP_ID");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,CustomDeserializer.class.getName());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
2

2 Answers

6
votes

If you want to use commitSync(offset) you have to be careful and read its Javadoc:

The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.

If you don't add + 1 to the offset, it is expected that on next restart, the consumer will consume again the last message. As mentioned in the other answer, if you use commitSync() without any argument, you don't have to worry about that

0
votes

It looks like you're trying to commit using new OffsetAndMetadta(offset). That's not the typical usage.

Here's an example from the documentation, under Manual Offset Control:

 List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }

https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Notice how the consumer.commitSync() call is performed without any parameters. It simply consumes, and it will commit to whatever was consumed up to that point.