0
votes

I am using spring-Kafka 2.2.2.RELEASE(org.apache.kafka:kafka-clients:jar:2.0.1) and spring-boot(2.1.1). I am not able to execute a transaction because my listener is not able to get a partition assigned. I created the configuration suggested for exactly once consumer. I am trying to configure a transactional Listener Container and Exactly Once Processing

I configured the producer and consumer using transaction manager, producer with transaction id, consumer with isolation.level=read_committed.

@Bean(name = "producerFactory")
        public ProducerFactory<String, MyObject> producerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
            configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
            configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"txApp");
            DefaultKafkaProducerFactory<String, KafkaSerializer> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
            producerFactory.setTransactionIdPrefix("tx.");

                    return producerFactory;
        }



@Bean
    public KafkaTransactionManager<?, ?> kafkaTransactionManager() {
        KafkaTransactionManager<?, ?> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory());
        // ...
        return kafkaTransactionManager;
    }

@Bean(name="appTemplate")
    public KafkaTemplate<String,MyObject> kafkaTemplate(){
        KafkaTemplate<String, MyObject> kafkaTemplate = new KafkaTemplate<>(
                producerFactory());
        return kafkaTemplate;
    }

//Consumer

@Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                                                                          ConsumerFactory kafkaConsumerFactory,
                                                                          KafkaTransactionManager kafkaTransactionManager) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
        return factory;
    }

//in the Consumer
   @KafkaListener(topics = "myTopic", groupId = "ingest", concurrency = "4")
    public void listener(@Payload MyObject message,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) throws ExecutionException, InterruptedException {

...

// In my producer

myTemplate.executeInTransaction(t-> t.send(kafkaConfig.getTopicName(), myMessage));

I am expecting to see the message arriving to my listener, but when I execute the producer I am getting below error:

22-07-2019 10:21:55.283 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR  o.a.k.c.c.i.ConsumerCoordinator.onJoinComplete request.id= request.caller=  - [Consumer clientId=consumer-2, groupId=ingest] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment 
org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
    at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:150)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:137)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:1657)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
1

1 Answers

0
votes

Have a look at the server log; most likely you don't have enough replicas to support transactions (3 by default). You can set it to 1 if you are only testing.

See broker properties transaction.state.log.replication.factor and min.insync.replicas.

The replication factor for the transaction topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

and

When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.