1
votes

I have a consumer which consumes a message, does some heavy job and then acks.

    @KafkaListener(topics = "${kafka.topic}", groupId = "group", containerFactory ="ContainerFactory")
  public void consumeMessage(@Payload Payload message, @Headers MessageHeaders headers, Acknowledgment ack) {

try {
  //Heavy Job
  ack.acknowledge();
} catch (Exception e) {
  log("Error in Kafka Consumer);
    }
  }

Now if there is an exception, it should go to the catch block and Acknowledgement should not happen and if acknowledgement did not happen it should go back to the queue and be processed again. But it is not happening. The offset updates and the next message is picked. I understand that there is a poll size of the consumer which enables it to pick more than one message at a time. But even if one message is not acked it should reprocess it instead of ignoring it and updating the offset.

Here is the Kafka Consumer Config

`Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
1

1 Answers

1
votes

This is the expected behaviour of the underlying KafkaConsumer.

Under the covers, the KafkaConsumer uses the poll API which is described in the JavaDocs as:

"On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek(TopicPartition, long) or automatically set as the last committed offset for the subscribed list of partitions."

This means, it does not check for the last committed offsets but rather for the last consumed offsets and then fetches the data sequentially. Only when re-starting your job it will continue reading from the last committed offset for that consumer group or if you use a new consumer group based on the auto_offset_reset configuration.

To solve your problem, I see the following solutions that you can apply in the catch block:

  • Instead of just logging the "Error in Kafka Consumer" make your job shut down. Fix the code and re-start your application
  • Use the offset number (that caused the Exception) to re-position your consumer to the same offset again using the seek API. Details on seek method can be found here