I am using spring boot 2.1.9 with spring Kafka 2.2.9.
If the message failed a number of times (defined in the afterRollbackProcessor), The consumer stops polling the record. but if the consumer restarted, it again re-poll the same message and processes.
But I don't want the messages to be re-polled again, What is the best way to stop it?
here is my config
@Configuration
@EnableKafka
public class KafkaReceiverConfig {
// Kafka Server Configuration
@Value("${kafka.servers}")
private String kafkaServers;
// Group Identifier
@Value("${kafka.groupId}")
private String groupId;
// Kafka Max Retry Attempts
@Value("${kafka.retry.maxAttempts:5}")
private Integer retryMaxAttempts;
// Kafka Max Retry Interval
@Value("${kafka.retry.interval:180000}")
private Long retryInterval;
// Kafka Concurrency
@Value("${kafka.concurrency:10}")
private Integer concurrency;
// Kafka Concurrency
@Value("${kafka.poll.timeout:300}")
private Integer pollTimeout;
// Kafka Consumer Offset
@Value("${kafka.consumer.auto-offset-reset:earliest}")
private String offset = "earliest";
@Value("${kafka.max.records:100}")
private Integer maxPollRecords;
@Value("${kafka.max.poll.interval.time:500000}")
private Integer maxPollIntervalMs;
@Value("${kafka.max.session.timeout:60000}")
private Integer sessionTimoutMs;
// Logger
private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
AfterRollbackProcessor<String, String> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>(
(record, exception) -> {
log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic()
+ " value:" + record.value());
messageProducer.saveFailedMessage(record, exception);
}, retryMaxAttempts);
factory.setAfterRollbackProcessor(afterRollbackProcessor);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
log.debug("Kafka Receiver Config consumerFactory created");
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
log.debug("Kafka Receiver Config consumerConfigs created");
return props;
}
}
How can I achieve this?