0
votes

I am using spring-boot & spring-kafka (see pom.xml for specific versions) with Apache Kafka v2.0.1 and I am facing a weird issue when using transactions via Listener Container.

The issue is raised when there is an error originated by Kafka while trying to publish new message on consume-process-produce cycle. To emulate it, I set topic's min.insync.replicas setting (4) greater than Kafka's available brokers (3).

I was expecting that due to factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<String, String>(-1));, there would be transaction rollback on each unsuccessful attempt and infinite retries.

As you may see on the output.log, when processing fails on the 1st try (message receival), the listener container initiates the transaction rollback and then it receives again the same message (as expected).

However, while trying for the 2nd time to process & publish the new message, it blocks forever on kafkaTemplate.send(record).get(); and no transaction rollback is initiated... By the way, at that point, if I set back my min.insync.replicas setting to an acceptable value (<=brokers), the process continues normally and the transaction is committed.

In order to emulate it, just publish a string message to the topic, increase topic's min.insync.replicas to unacceptable value and run the application.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
    </parent>
    <groupId>com.project.test</groupId>
    <artifactId>kafka-transactions</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Application.java

package com.project.test;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@EnableKafka
@EnableTransactionManagement
@SpringBootApplication
public class Application {

    private String kafkaBootstrapServers = "127.0.0.1:9092";

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.RETRIES_CONFIG, "1");

        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
        producerFactory.setTransactionIdPrefix("tx.");

        return producerFactory;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public KafkaTransactionManager<String, String> kafkaTransactionManager() {
        KafkaTransactionManager<String, String> manager = new KafkaTransactionManager<String, String>(producerFactory());
        return manager;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-kafka-tx-consumer");

        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(1);

        factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setCommitLogLevel(org.springframework.kafka.support.LogIfLevelEnabled.Level.INFO);

        factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<String, String>(-1));

        return factory;
    }

}

KafkaTransactions.java

package com.project.test;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaTransactions {

    private static final Logger log = LoggerFactory.getLogger(KafkaTransactions.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(topics = "test-kafka-transactions")
    public void messageListener(String value) throws Exception {

        log.info("Received message");

        ProducerRecord<String, String> record = new ProducerRecord<>("test-kafka-transactions", null, value);

        Thread.sleep(2000);

        log.info("Adding new message on Kafka transaction (commit is handled by the Listener Container)");
        kafkaTemplate.send(record).get();

        log.info("Processed message");
    }

}

output.log

2019-04-04 10:21:25.741  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test-kafka-transactions-3, test-kafka-transactions-2, test-kafka-transactions-1, test-kafka-transactions-0]
2019-04-04 10:23:03.240  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Received message
2019-04-04 10:23:05.245  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Adding new message on Kafka transaction (commit is handled by the Listener Container)
2019-04-04 10:23:05.247  INFO 19316 --- [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: gvDhAK6YRsWzh2FrxukHnA
2019-04-04 10:23:05.267  WARN 19316 --- [kafka-producer-network-thread | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1, transactionalId=tx.test-kafka-tx-consumer.test-kafka-transactions.3] Got error produce response with correlation id 12 on topic-partition test-kafka-transactions-3, retrying (0 attempts left). Error: NOT_ENOUGH_REPLICAS
2019-04-04 10:23:05.372 ERROR 19316 --- [kafka-producer-network-thread | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='1' and payload='1' to topic test-kafka-transactions:

org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.

2019-04-04 10:23:05.372 ERROR 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.project.test.KafkaTransactions.messageListener(java.lang.String) throws java.lang.Exception' threw exception; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:302) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1600(KafkaMessageListenerContainer.java:384) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$3.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1128) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1118) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_191]
    at java.util.concurrent.FutureTask.get(FutureTask.java:192) [na:1.8.0_191]
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:119) ~[spring-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at com.project.test.KafkaTransactions.messageListener(KafkaTransactions.java:29) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    ... 17 common frames omitted
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
    at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$0(KafkaTemplate.java:396) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-2.0.1.jar:na]
    ... 1 common frames omitted
Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.

2019-04-04 10:23:05.372  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Received message
2019-04-04 10:23:07.391  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Adding new message on Kafka transaction (commit is handled by the Listener Container)

1
I reproduced it Gist here. It appears to be a problem in the kafka-clients library - the thread is "stuck" waiting for the result of an abort request. The server log was going crazy with errors too. Perhaps having insync.replicas > replicas is simply invalid configuration. What happens if instead of increasing that, you bring down one of the cluster members? (I don't currently have a cluster set up, otherwise I would have tested it). - Gary Russell
Thanks for answering so quickly Gary! I have already tried it by setting the max message size to just 10 bytes and sending larger than that message, but the result was exactly the same (just the exception had as a cause the message size violation instead of not enough replicas)... I had the same consideration with you about min.insync.replicas and that's why I also tested it based on max message size. - nickorfas
I've done some more investigation and I am convinced there is a problem in kafka itself. If I throw an exception from the listener method (after sending a message), everything works fine; so I believe the framework is doing everything it should. However, when the send fails (e.g. max size), we get the hang. On a whim, I tried changing the logic to always close() the producer after an aborttransaction() to see if it would help. It does not. Now we get "Timeout expired while initializing transaction state" on beginTransaction() during the retries. Still investigating... - Gary Russell
Thanks for keeping me updated & for opening a bug at Kafka! By the way, I did try to turned off 2 out of 3 brokers of my Kafka cluster, but in this case we cannot reproduce our issue since the listener container can't even initialize the transaction due to not enough replicas... Anyway... I saw your submitted bug at Kafka and it is totally clear, so we are ok :-) ... Hope to have a fix quite soon! - nickorfas

1 Answers

0
votes

I was able to reproduce it after taking Spring out of the picture.

I submitted a bug KAFKA-8195.