2
votes

I am using spring boot 2.1.9 with spring Kafka 2.2.9.

If the message failed a number of times (defined in the afterRollbackProcessor), The consumer stops polling the record. but if the consumer restarted, it again re-poll the same message and processes.

But I don't want the messages to be re-polled again, What is the best way to stop it?

here is my config

@Configuration
@EnableKafka
public class KafkaReceiverConfig {

    // Kafka Server Configuration
    @Value("${kafka.servers}")
    private String kafkaServers;

    // Group Identifier
    @Value("${kafka.groupId}")
    private String groupId;

    // Kafka Max Retry Attempts
    @Value("${kafka.retry.maxAttempts:5}")
    private Integer retryMaxAttempts;

    // Kafka Max Retry Interval
    @Value("${kafka.retry.interval:180000}")
    private Long retryInterval;

    // Kafka Concurrency
    @Value("${kafka.concurrency:10}")
    private Integer concurrency;

    // Kafka Concurrency
    @Value("${kafka.poll.timeout:300}")
    private Integer pollTimeout;

    // Kafka Consumer Offset
    @Value("${kafka.consumer.auto-offset-reset:earliest}")
    private String offset = "earliest";

    @Value("${kafka.max.records:100}")
    private Integer maxPollRecords;

    @Value("${kafka.max.poll.interval.time:500000}")
    private Integer maxPollIntervalMs;

    @Value("${kafka.max.session.timeout:60000}")
    private Integer sessionTimoutMs;

    // Logger
    private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);


    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);

        AfterRollbackProcessor<String, String> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>(
                (record, exception) -> {
                    log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic()
                            + " value:" + record.value());
                    messageProducer.saveFailedMessage(record, exception);
                }, retryMaxAttempts);

        factory.setAfterRollbackProcessor(afterRollbackProcessor);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        log.debug("Kafka Receiver Config consumerFactory created");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new ConcurrentHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        log.debug("Kafka Receiver Config consumerConfigs created");
        return props;
    }

}

How can I achieve this?

1

1 Answers

3
votes

Set the commitRecovered property to true and inject a KafkaTemplate configured with the same producer factory as the transaction manager.

/**
 * {@inheritDoc}
 * Set to true and the container will run the
 * {@link #process(List, Consumer, Exception, boolean)} method in a transaction and,
 * if a record is skipped and recovered, we will send its offset to the transaction.
 * Requires a {@link KafkaTemplate}.
 * @param commitRecovered true to process in a transaction.
 * @since 2.2.5
 * @see #isProcessInTransaction()
 * @see #process(List, Consumer, Exception, boolean)
 * @see #setKafkaTemplate(KafkaTemplate)
 */
@Override
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
    super.setCommitRecovered(commitRecovered);
}

EDIT

Here's the logic in process...

    if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
            getSkipPredicate((List) records, exception), this.logger)
                && isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {

        // if we get here it means retries are exhausted and we've skipped

        ConsumerRecord<K, V> skipped = records.get(0);
        this.kafkaTemplate.sendOffsetsToTransaction(
                Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                        new OffsetAndMetadata(skipped.offset() + 1)));
    }

EDIT2

In 2.2.x, the property is

/**
 * Set to true to run the {@link #process(List, Consumer, Exception, boolean)}
 * method in a transaction. Requires a {@link KafkaTemplate}.
 * @param processInTransaction true to process in a transaction.
 * @since 2.2.5
 * @see #process(List, Consumer, Exception, boolean)
 * @see #setKafkaTemplate(KafkaTemplate)
 */
public void setProcessInTransaction(boolean processInTransaction) {
    this.processInTransaction = processInTransaction;
}