0
votes

we are using Spring KafkaListener which acknowledges each records after it is processed to DB. If we have problems writing to DB we don't acknowledge the record so that offsets are not committed for the consumer. this works fine. Now we want to get the failed messages in next poll to retry them. we added errorhandler to our listener and invoked ConsumerAwareListenerErrorHandler and tried to do consumer.seek() for the failed message offset. Expectation is during next poll, we should received the failed messages. This is not happening. Next poll fetches only the new messages and not the failed messages Code snippet is given below.

@Service
public class KafkaConsumer {
       @KafkaListener(topics = ("${kafka.input.stream.topic}"), containerFactory = "kafkaManualAckListenerContainerFactory", errorHandler = "listen3ErrorHandler")
      public void onMessage(ConsumerRecord<Integer, String> record,
            Acknowledgment acknowledgment ) throws Exception {

    try {

        msg = JaxbUtil.convertJsonStringToMsg(record.value());
        onHandList = DCMUtil.convertMsgToOnHandDTO(msg);

        TeradataDAO.updateData(onHandList);
        acknowledgment.acknowledge();
        recordSuccess = true;

        LOGGER.info("Message Saved in Teradata DB");

    }  catch (Exception e) {
        LOGGER.error("Error Processing On Hand Data ", e);
        recordSuccess = false;
    }

}

  @Bean
    public ConsumerAwareListenerErrorHandler listen3ErrorHandler() throws InterruptedException {
        return (message, exception, consumer) -> {
            this.listen3Exception = exception;
            MessageHeaders headers = message.getHeaders();
            consumer.seek(new org.apache.kafka.common.TopicPartition(
                          headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
                          headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
                          headers.get(KafkaHeaders.OFFSET, Long.class)); 
            return null;
        };
    }
}

    Container  Class

    @Bean
public Map<Object,Object> consumerConfigs() {
    Map<Object,Object>  props = new HashMap<Object,Object> ();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            localhost:9092);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-1");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return props;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory(consumerConfigs());
}

 @SuppressWarnings("unchecked")
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaManualAckListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    return factory;
}
2

2 Answers

0
votes

It's supposed to work like this:

The error handler needs to throw an exception if you want to discard additional records from the previous poll.

Since you are "handling" the error, the container knows nothing and will continue to call the listener with the remaining records from the poll.

That said, I see that the container is also ignoring an exception thrown by the error handler (it will discard if the error handler throws an Error not an exception). I will open an issue for this.

Another work around would be to add the Consumer to the listener method signature and do the seek there (and throw an exception). If there is no error handler, the rest of the batch is discarded.

Correction

If the container has no ErrorHandler, any Throwable thrown by a ListenerErrorHandler will cause the remaining records to be discarded.

0
votes

Please try using SeekToCurrentErrorHandler. The doc says "This allows implementations to seek all unprocessed topic/partitions so the current record (and the others remaining) will be retrieved by the next poll. The SeekToCurrentErrorHandler does exactly this.

The container will commit any pending offset commits before calling the error handler." https://docs.spring.io/autorepo/docs/spring-kafka-dist/2.1.0.BUILD-SNAPSHOT/reference/htmlsingle/#_seek_to_current_container_error_handlers