3
votes

I have an Spring boot service, with RabbitMQ.

Im developing a test to ensure that a process keeps Transaction state among several Transactions.

Example:

@Transactional
@RabbitListener
Queue Listener 1
1- receive message
2- call to Class 1 Method 1

@Transactional
Class 1 Method 1
1- Persist some data in data base
2- Publish a Message in a 2nd Queue

@Transactional
@RabbitListener
Queue Listener 2
- read message
- doStuff()

I would like my service to have the following behaviour.

If in Queue Listener 2 an exception is thrown (for example in doStuff()), i would like to rollback the Class 1 Method 1 transaction too. And i would like Retries cappabilities too.

Configuring Transactions with Propagation.SUPPORTS or Propagation.REQUIRED seems to not work neither.

Rabbit Listeners are annotated as @Transactional.

For that, i configured my SimpleRabbitListenerContainerFactory Advice Chain as StatefulRetryOperationsInterceptor.

I configured the Transaction manager which in my Application is the JpaTransactionManager.

Rabbit templates channel Transacteds are set to true,

Also Message Converter with jackson2JsonMessageConverter.setCreateMessageIds(true);, to keep Message IDs.

Here i attach my SimpleRabbitListenerContainerFactory config:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(final SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                               final PlatformTransactionManager platformTransactionManager) throws IOException {

        final SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
        final CachingConnectionFactory cachingConnectionFactory = cachingConnectionFactory();

        container.setConnectionFactory(cachingConnectionFactory);
        container.setMessageConverter(jackson2JsonMessageConverter());
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setDefaultRequeueRejected(false);
        container.setConsecutiveIdleTrigger(1);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        container.setChannelTransacted(true);

        container.setTransactionManager(platformTransactionManager);

        container.setConcurrentConsumers(listenerConcurrency);
        container.setMaxConcurrentConsumers(maxListenerConcurrency);

        final StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor = RetryInterceptorBuilder.stateful()
                .retryPolicy(new SimpleRetryPolicy(maxAttempts, exceptionsTriggeringRetry()))
                .backOffPolicy(backOffPolicy())
                .build();

        container.setAdviceChain(statefulRetryOperationsInterceptor);
        configurer.configure(container, cachingConnectionFactory);

        return container;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {

        final ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

        final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(objectMapper);
        jackson2JsonMessageConverter.setCreateMessageIds(true);

        return jackson2JsonMessageConverter;
    }

    @Bean
    public RabbitTemplate transactedRabbitTemplate(@Named("cachingConnectionFactory") final ConnectionFactory connectionFactory,
                                                  @Named("jackson2JsonMessageConverter") final Jackson2JsonMessageConverter jackson2MessageConverter){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2MessageConverter);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

The Retries policy and its number of retries seems to be respected,

but the transaction integrity (with rollback) seems to not being executed (so no rollbacks in previous transactions. ie: 3rd transaction fails dont triggers 2nd transaction rollback), but of course errors within same transaction unit works properly.

EDIT

Tried also:

container.setAdviceChain(new TransactionInterceptor(platformTransactionManager, new Properties()));

but seems to not work anyway.

1

1 Answers

0
votes

A transaction cannot span a queue like that; publishing to the queue is the end of the road for the first transaction.

Producers and consumers are isolated from each other.

Since you are using Rabbit transactions as well, Listener 2 won't even see the message until the first transaction commits.