1
votes

I'm using Spring AMQP 2.1.6 and Spring Boot 2.1.5 and I'm looking for the recommended way to configure spring-amqp to retry business exceptions for reactive components (Mono) with exponential backoff. For example:

@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
    Mono<Void> mono = myService.doSomething(myMessage);
    return mono;
}

I'd like spring-amqp to retry automatically if doSomething returns an error. Usually one can configure this for blocking RabbitListener's when setting up the container factory:

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setAdviceChain(retryInterceptor(..));

Where retryInterceptor might be defined like this:

private static RetryOperationsInterceptor retryInterceptor(long backoffInitialInterval, double backoffMultiplier, long backoffMaxInterval, int maxAttempts) {
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(backoffInitialInterval);
    backOffPolicy.setMultiplier(backoffMultiplier);
    backOffPolicy.setMaxInterval(backoffMaxInterval);

    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy((new SimpleRetryPolicy(maxAttempts)));
    retryTemplate.setBackOffPolicy(backOffPolicy);

    StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
    bean.setRetryOperations(retryTemplate);
    return bean.getObject();
}

But the advice chain doesn't seem to be used for reactive RabbitListener's. This is probably because, if I understand it correctly, the RetryTemplate/ExponentialBackOffPolicy actually blocks the thread.

As a workaround I could of course do something like (switching to Kotlin because it's a bit easier):

@RabbitListener
fun myListener(MyMessage myMessage) : Mono<Void> {
    return myService.doSomething(myMessage)
                    .retryExponentialBackoff(10, Duration.ofMillis(100), Duration.ofSeconds(5)) { ctx ->
            log.info("Caught exception ${ctx.exception()}")
        }
}

But I'd like this retry logic to be applied to for all instances of Mono returned from RabbitListener's. Is something like this possible or should you configure this another way when using reactive sequences from project reactor with spring-amqp?

1

1 Answers

1
votes

It is really better to apply retry logic into your reactive sequence, similar way you do with the retryExponentialBackoff(). Just because the Reactive Streams execution doesn't happen on the same thread we can apply that Retrytemplate for the myListener().

Right now the logic internally is like this:

private static class MonoHandler {

    static boolean isMono(Object result) {
        return result instanceof Mono;
    }

    @SuppressWarnings("unchecked")
    static void subscribe(Object returnValue, Consumer<? super Object> success,
            Consumer<? super Throwable> failure) {

        ((Mono<? super Object>) returnValue).subscribe(success, failure);
    }

}

That Consumer<? super Throwable> failure does this:

private void asyncFailure(Message request, Channel channel, Throwable t) {
    this.logger.error("Future or Mono was completed with an exception for " + request, t);
    try {
        channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, true);
    }
    catch (IOException e) {
        this.logger.error("Failed to nack message", e);
    }
} 

So, we don't have any way to to initiate that RetryTemplate at all, but at the same time with an explicit basicNack() we have a natural retry with the re-fetching the same message from the RabbitMQ back.

We could probably apply a Reactor retry for that Mono internally, but it doesn't look like RetryOperationsInterceptor can simply be converted to the Mono.retry().

So, in other words, the RetryOperationsInterceptor is a wrong way for reactive processing. Use Mono.retry() explicitly in your own code.

You may expose some common utility method and apply it as a Mono.transform(Function<? super Mono<T>, ? extends Publisher<V>> transformer) whenever you have a reactive return for the @RabbitListener method.