1
votes

I have a spring integration IntegrationFlow that's defined like this:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean)
                    .get();

And serviceActivatorBean looks like this:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator

    public void myMethod(Collection<MyEvent> events) {
        ....
    }    
}

If myMethod throws an exception it will be logged but no retry will happen. I've tried to change the IntegrationFlow to this:

RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .adviceChain(advice)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean)
                    .get();

But then I a log message like this (an retries won't happen):

2017-06-30 13:18:10.611 WARN 88706 --- [erContainer#1-2] o.s.i.h.a.RequestHandlerRetryAdvice : This advice org.springframework.integration.handler.advice.RequestHandlerRetryAdvice can only be used for MessageHandlers; an attempt to advise method 'invokeListener' in 'org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1' is ignored

How can I configure this IntegrationFlow to behave the same way a RabbitListener would? I.e. let RabbitMQ publish the messages again.

1

1 Answers

1
votes

Use a retry interceptor in the adapter's advice chain instead of the RequestHandlerRetryAdvice - that is for consuming endpoints, as the message says.