0
votes

I am implementing spring kafka batch listener, which reads list of messages from Kafka topic and posts the data to a REST service. I would like to understand the offset management in case of the REST service goes down, the offsets for the batch should not be committed and the messages should be processed for the next poll. I have read spring kafka documentation but there is confusion in understanding the difference between Listener Error Handler and Seek to current container error handlers in batch. I am using spring-boot-2.0.0.M7 version and below is my code.

Listener Config:

@Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.setConcurrency(Integer.parseInt(env.getProperty("spring.kafka.listener.concurrency")));
        // factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setBatchErrorHandler(kafkaErrorHandler());

        factory.getContainerProperties().setAckMode(AckMode.BATCH);
        factory.setBatchListener(true);
        return factory;
    }
@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
                env.getProperty("spring.kafka.consumer.enable-auto-commit"));
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
                env.getProperty("spring.kafka.consumer.auto-commit-interval"));
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("spring.kafka.session.timeout"));
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
        return propsMap;
    }

Listener Class:

@KafkaListener(topics = "${spring.kafka.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
    public void listen(List<String> payloadList) throws Exception {
        if (payloadList.size() > 0)
            //Post to the service
    }

Kafka Error Handler:

public class KafkaErrorHandler implements BatchErrorHandler {

    private static Logger LOGGER = LoggerFactory.getLogger(KafkaErrorHandler.class);

    @Override
    public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
        LOGGER.info("Exception occured while processing::" + thrownException.getMessage());

            }

}

How to handle Kafka listener so that if something happens during processing batch of records, I wouldn't loose data.

1
hi. were u able to find the way to do that? - jbakirov

1 Answers

0
votes

With Apache Kafka we never lose the data. There is indeed an offset in partition logs to seek to any arbitrary position.

On the other hand, when we consume records from a partition there is no requirement to commit their offsets - the current consumer holds the state in the memory. We need to commit only for other, new consumers in the same group when the current one is dead. Independently of the error, the current consumer always moves on to poll new data behind its current in-memory offset.

So, to reprocess the same data in the same consumer we definitely have to use seek operation to move the consumer back to the desired position. That's why Spring Kafka introduces SeekToCurrentErrorHandler:

This allows implementations to seek all unprocessed topic/partitions so the current record (and the others remaining) will be retrieved by the next poll. The SeekToCurrentErrorHandler does exactly this.

https://docs.spring.io/spring-kafka/reference/htmlsingle/#_seek_to_current_container_error_handlers