I have an Spring boot service, with RabbitMQ.
Im developing a test to ensure that a process keeps Transaction state among several Transactions.
Example:
@Transactional
@RabbitListener
Queue Listener 1
1- receive message
2- call to Class 1 Method 1
@Transactional
Class 1 Method 1
1- Persist some data in data base
2- Publish a Message in a 2nd Queue
@Transactional
@RabbitListener
Queue Listener 2
- read message
- doStuff()
I would like my service to have the following behaviour.
If in Queue Listener 2
an exception is thrown (for example in doStuff()
), i would like to rollback the Class 1 Method 1
transaction too.
And i would like Retries cappabilities too.
Configuring Transactions with Propagation.SUPPORTS
or Propagation.REQUIRED
seems to not work neither.
Rabbit Listeners are annotated as @Transactional.
For that, i configured my SimpleRabbitListenerContainerFactory
Advice Chain as StatefulRetryOperationsInterceptor
.
I configured the Transaction manager which in my Application is the JpaTransactionManager
.
Rabbit templates channel Transacteds are set to true,
Also Message Converter with jackson2JsonMessageConverter.setCreateMessageIds(true);
, to keep Message IDs.
Here i attach my SimpleRabbitListenerContainerFactory
config:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(final SimpleRabbitListenerContainerFactoryConfigurer configurer,
final PlatformTransactionManager platformTransactionManager) throws IOException {
final SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
final CachingConnectionFactory cachingConnectionFactory = cachingConnectionFactory();
container.setConnectionFactory(cachingConnectionFactory);
container.setMessageConverter(jackson2JsonMessageConverter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setConsecutiveIdleTrigger(1);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setChannelTransacted(true);
container.setTransactionManager(platformTransactionManager);
container.setConcurrentConsumers(listenerConcurrency);
container.setMaxConcurrentConsumers(maxListenerConcurrency);
final StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor = RetryInterceptorBuilder.stateful()
.retryPolicy(new SimpleRetryPolicy(maxAttempts, exceptionsTriggeringRetry()))
.backOffPolicy(backOffPolicy())
.build();
container.setAdviceChain(statefulRetryOperationsInterceptor);
configurer.configure(container, cachingConnectionFactory);
return container;
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(objectMapper);
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
@Bean
public RabbitTemplate transactedRabbitTemplate(@Named("cachingConnectionFactory") final ConnectionFactory connectionFactory,
@Named("jackson2JsonMessageConverter") final Jackson2JsonMessageConverter jackson2MessageConverter){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2MessageConverter);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
The Retries policy and its number of retries seems to be respected,
but the transaction integrity (with rollback) seems to not being executed (so no rollbacks in previous transactions. ie: 3rd transaction fails dont triggers 2nd transaction rollback), but of course errors within same transaction unit works properly.
EDIT
Tried also:
container.setAdviceChain(new TransactionInterceptor(platformTransactionManager, new Properties()));
but seems to not work anyway.