4
votes

I use Spring Kafka API to implement Kafka consumer with manual offset management:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

Here, I want the consumer to commit the offset only if someCondition holds. Otherwise the consumer should sleep for some time and read the same message again.

Kafka Configuration:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

With the current configuration, if someCondition == false, consumer doesn't commit the offset, but still reads the next messages. Is there a way to make the consumer reread a message if the Kafka acknowledgement wasn't performed?

3

3 Answers

8
votes

As @Gary already pointed out, you are in the correct direction, seek() is the way to do it. I couldn't find code example of it today, when I faced this problem. Here is the code for anyone who wants to solve the problem.

public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {

    private ConsumerSeekCallback consumerSeekCallback;


    @Override
    public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {

        if (/*some condition*/) {
            //process
            acknowledgment.acknowledge(); //send ack
        } else {

            consumerSeekCallback.seek("your.topic", record.partition(), record.offset());

        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
        this.consumerSeekCallback = consumerSeekCallback;
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

}
4
votes

You can stop and restart the container and it will be re-sent.

With the upcoming 1.1 release, you can seek to the required offset and it will be resent.

But you will still see later messages first if they have already been retrieved so you will have to discard those too.

The second milestone has that feature and we expect it to be released next week.

0
votes

You may try using nack(long sleep) where the only parameter represents sleep interval ms to achieve the mentioned behavior.

From Spring for Apache Kafka documentation:

Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.

Applying the above information into code example we get:

@Component
@Slf4j
public class ExampleConsumer {
    private boolean nonError = false;
    
    @KafkaListener(topics = "topic_name")
    private void consumeSelectingMsgFromMailbox(ConsumerRecord<String, KafkaEventPojo> record, Acknowledgment ack) {
        log.info("Received record topic:{} partition:{} offset:{}", record.topic(), record.partition(), record.offset());
        
        if (nonError) {
            log.info("ACK: {}", offset);
            ack.acknowledge(); //send ack
            if (offset % 2 == 0)
                nonError = false;
        } else {
            ack.nack(0); // immediate seek - no sleep time for consumer
            nonError = true;
        }
    }
}

The configuration looks as follows:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    private ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> factory;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // ...
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // ...

        return props;
    }

    @Bean
    public ConsumerFactory<String, KafkaEventPojo> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> kafkaListenerContainerFactory() {
        if (this.factory == null) {
            this.factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        }

        return this.factory;
    }

The example produces:

2020-07-31 17:05:19.275  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.792  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.793  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 15
2020-07-31 17:05:19.805  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:16
2020-07-31 17:05:19.805  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 16
2020-07-31 17:05:19.810  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 17
2020-07-31 17:05:20.318  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:18
2020-07-31 17:05:20.318  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 18
2020-07-31 17:05:20.322  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.827  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.828  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 19

Note: KafkaEventPojo is my implementation of POJO which holds record data stored in Kafka following our internal structure - so you may alter this by whatever you need. Additionally, the code above demonstrates the usage of nack for single record listener. In case you need batched option, you can find examples how to do so in the provided documentation.