I've used Spring Integration with JMS very successfully before, but we're now using with RabbitMQ / AMQP and having some issues with error handling.
I have an int-amqp:inbound-channel-adapter with with a errorChannel set up to receive any exception, here an ErrorTransformer class inspects the failed Message's cause exception. Then depending on the type of exception either :-
suppresses the exception and transforms into a JSON object that can go to AMQP outbound-channel-adapter as a business reply explaining the failure. Here I want the original message consumed/ACKed.
Or Re-throws the causing exception to let RabbitMQ re-deliver the message, a certain number of times.
I found that re-throwing caused the message to be re-delivered continuously, I then read about StatefulRetryOperationsInterceptorFactoryBean, so added an advice chain to retry 3 times, then I got exception about no message-id, so also added a 'MissingMessageIdAdvice' at the start of the advice chain.
Despite the advice, I still get continuous re-tries for a RuntimeException that is re-thrown from errorChannel's ErrorTransformer. I'm publishing the message via RabbitMQ admin just using the defaults. Not sure if the lack of a message-id is making this not work, if so how do I get a message to have an id ? I'm confused about the differences between the :-
A) ConditionalRejectingErrorHandler (I've set as the inbound adapter's error-handler) which allows me to provide a customFatalExceptionStrategy to implement isFatal(). Where I believe fatal=true (means DISCARD) and message is consumed and discarded, but how can I still send an outbound failure message ?
B) And the errorChannel I have on the inbound adapter which I'm using to inspect an Exception and transform into a outbound fail response message. Here I guess I could throw AmqpRejectAndDontRequeueException, but then why have the ConditionalRejectingErrorHandler as well ? and will thorwing AmqpRejectAndDontRequeueException work
<int-amqp:inbound-channel-adapter id="amqpInRequestPatternValuation" channel="requestAmqpIn" channel-transacted="true" transaction-manager="transactionManager"
queue-names="requestQueue" error-channel="patternValuationErrorChannel" connection-factory="connectionFactory"
receive-timeout="59000" concurrent-consumers="1"
advice-chain="retryChain" error-handler="customErrorHandler" />
<bean id="customErrorHandler" class="org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler">
<constructor-arg ref="customFatalExceptionStrategy"/>
</bean>
<bean id="customFatalExceptionStrategy" class="abc.common.CustomFatalExceptionStrategy"/>
<!-- ADVICE CHAIN FOR CONTROLLING NUMBER OF RE-TRIES before sending to DLQ (or discarding if no DLQ) without this any re-queued fatal message will retry forever -->
<util:list id="retryChain">
<bean class="org.springframework.amqp.rabbit.retry.MissingMessageIdAdvice">
<constructor-arg>
<bean class="org.springframework.retry.policy.MapRetryContextCache" />
</constructor-arg>
</bean>
<ref bean="retryInterceptor" />
</util:list>
<bean id="retryInterceptor"
class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
<property name="retryOperations" ref="retryTemplate" />
<property name="messageRecoverer" ref="messageRecoverer"/>
</bean>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="retryPolicy" ref="simpleRetryPolicy" />
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="10000" />
</bean>
</property>
</bean>
<bean id="simpleRetryPolicy" class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>