You may try using nack(long sleep)
where the only parameter represents sleep interval ms
to achieve the mentioned behavior.
From Spring for Apache Kafka documentation:
Starting with version 2.3, the Acknowledgment interface has two
additional methods nack(long sleep) and nack(int index, long sleep).
The first one is used with a record listener, the second with a batch
listener. Calling the wrong method for your listener type will throw
an IllegalStateException.
Applying the above information into code example we get:
@Component
@Slf4j
public class ExampleConsumer {
private boolean nonError = false;
@KafkaListener(topics = "topic_name")
private void consumeSelectingMsgFromMailbox(ConsumerRecord<String, KafkaEventPojo> record, Acknowledgment ack) {
log.info("Received record topic:{} partition:{} offset:{}", record.topic(), record.partition(), record.offset());
if (nonError) {
log.info("ACK: {}", offset);
ack.acknowledge();
if (offset % 2 == 0)
nonError = false;
} else {
ack.nack(0);
nonError = true;
}
}
}
The configuration looks as follows:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> factory;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
@Bean
public ConsumerFactory<String, KafkaEventPojo> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> kafkaListenerContainerFactory() {
if (this.factory == null) {
this.factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
}
return this.factory;
}
The example produces:
2020-07-31 17:05:19.275 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.792 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.793 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 15
2020-07-31 17:05:19.805 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:16
2020-07-31 17:05:19.805 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 16
2020-07-31 17:05:19.810 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 17
2020-07-31 17:05:20.318 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:18
2020-07-31 17:05:20.318 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 18
2020-07-31 17:05:20.322 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.827 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.828 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 19
Note: KafkaEventPojo
is my implementation of POJO which holds record data stored in Kafka following our internal structure - so you may alter this by whatever you need. Additionally, the code above demonstrates the usage of nack for single record listener. In case you need batched option, you can find examples how to do so in the provided documentation.