I'm using Spring AMQP 2.1.6 and Spring Boot 2.1.5 and I'm looking for the recommended way to configure spring-amqp to retry business exceptions for reactive components (Mono) with exponential backoff. For example:
@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
Mono<Void> mono = myService.doSomething(myMessage);
return mono;
}
I'd like spring-amqp to retry automatically if doSomething returns an error. Usually one can configure this for blocking RabbitListener's when setting up the container factory:
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setAdviceChain(retryInterceptor(..));
Where retryInterceptor might be defined like this:
private static RetryOperationsInterceptor retryInterceptor(long backoffInitialInterval, double backoffMultiplier, long backoffMaxInterval, int maxAttempts) {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(backoffInitialInterval);
backOffPolicy.setMultiplier(backoffMultiplier);
backOffPolicy.setMaxInterval(backoffMaxInterval);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy((new SimpleRetryPolicy(maxAttempts)));
retryTemplate.setBackOffPolicy(backOffPolicy);
StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
bean.setRetryOperations(retryTemplate);
return bean.getObject();
}
But the advice chain doesn't seem to be used for reactive RabbitListener's. This is probably because, if I understand it correctly, the RetryTemplate/ExponentialBackOffPolicy actually blocks the thread.
As a workaround I could of course do something like (switching to Kotlin because it's a bit easier):
@RabbitListener
fun myListener(MyMessage myMessage) : Mono<Void> {
return myService.doSomething(myMessage)
.retryExponentialBackoff(10, Duration.ofMillis(100), Duration.ofSeconds(5)) { ctx ->
log.info("Caught exception ${ctx.exception()}")
}
}
But I'd like this retry logic to be applied to for all instances of Mono returned from RabbitListener's. Is something like this possible or should you configure this another way when using reactive sequences from project reactor with spring-amqp?