0
votes

We are using spring-kafka 1.2.2.RELEASE.

What we want
1. As soon as a message is consumed and processed successfully, offset is committed in spring-kafka.
I am using Manaul Commit/Acknowledgement for it, it is working fine.

2. In case of any exception we want spring-kafka to resend the same message.
We are throwing RunTime exception on any system error, which was logged by spring-kafka and never committed.
This is fine as we don't want it to commit, but that message stays in spring-kafka and never comes back unless we restart the service. On restart message comes back and executes once again and then stay in spring-kafka

What we tried
1. I have tried both ErrorHandler and RetryingMessageListenerAdapter, but in both cases we have to code in service how to process the message again

This is my consumer

public class MyConsumer{
    @KafkaListener
    public void receive(...){
        // application logic to return success/failure
        if(success){
            acknowledgement.acknowledge();
        }else{
            throw new RunTimeException();
        }
    }
} 

Also I have following configurations for container factory

factory.getContainerProperties().setErrorHandler(new ErrorHandler(){
    @Override
    public void handle(...){
        throw new RunTimeException("");
    }
});

While executing the flow, control is coming inside both first to receive and then handle method. After that service waits for new message. However I was expecting, since we threw an exception, and message is not committed, same message should land in receive method again.

Is there any way, we can tell spring kafka "do not commit this message and send it again asap?"

2

2 Answers

1
votes

1.2.x is no longer supported; 1.x users are recommended to upgrade to at least 1.3.x (currently 1.3.8) because of its much simpler threading model, thanks to KIP-62.

The current version is 2.2.2.

2.0.1 introduced the SeekToCurrentErrorHandler which re-seeks the failed record so that it is redelivered.

With earlier versions, you had to stop and restart the container to redeliver a failed message, or add retry to the listener adapter.

I suggest you upgrade to the newest possible release.

0
votes

Unfortunately version available for us to use is 1.3.7.RELEASE.

I have tried implementing the ConsumerSeekAware interface. Below is how I am doing it and I can see message delivering repreatedly

Consumer

public class MyConsumer implements ConsumerSeekAware{
    private ConsumerSeekCallback consumerSeekCallback;
    if(condition) {
            acknowledgement.acknowledge();
        }else {
            consumerSeekCallback.seek((String)headers.get("kafka_receivedTopic"),
                    (int) headers.get("kafka_receivedPartitionId"),
                    (int) headers.get("kafka_offset"));
        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
        this.consumerSeekCallback = consumerSeekCallback;
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> arg0, ConsumerSeekCallback arg1) {
        LOGGER.debug("onIdleContainer called");
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> arg0, ConsumerSeekCallback arg1) {
        LOGGER.debug("onPartitionsAssigned called");
    }
}

Config

public class MyConsumerConfig {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // Set server, deserializer, group id
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MyModel> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);
        return factory;
    }

    @Bean
    public MyConsumer receiver() {
        return new MyConsumer();
    }
}