2
votes

I've used Spring Integration with JMS very successfully before, but we're now using with RabbitMQ / AMQP and having some issues with error handling.

I have an int-amqp:inbound-channel-adapter with with a errorChannel set up to receive any exception, here an ErrorTransformer class inspects the failed Message's cause exception. Then depending on the type of exception either :-

  1. suppresses the exception and transforms into a JSON object that can go to AMQP outbound-channel-adapter as a business reply explaining the failure. Here I want the original message consumed/ACKed.

  2. Or Re-throws the causing exception to let RabbitMQ re-deliver the message, a certain number of times.

I found that re-throwing caused the message to be re-delivered continuously, I then read about StatefulRetryOperationsInterceptorFactoryBean, so added an advice chain to retry 3 times, then I got exception about no message-id, so also added a 'MissingMessageIdAdvice' at the start of the advice chain.

Despite the advice, I still get continuous re-tries for a RuntimeException that is re-thrown from errorChannel's ErrorTransformer. I'm publishing the message via RabbitMQ admin just using the defaults. Not sure if the lack of a message-id is making this not work, if so how do I get a message to have an id ? I'm confused about the differences between the :-

A) ConditionalRejectingErrorHandler (I've set as the inbound adapter's error-handler) which allows me to provide a customFatalExceptionStrategy to implement isFatal(). Where I believe fatal=true (means DISCARD) and message is consumed and discarded, but how can I still send an outbound failure message ?

B) And the errorChannel I have on the inbound adapter which I'm using to inspect an Exception and transform into a outbound fail response message. Here I guess I could throw AmqpRejectAndDontRequeueException, but then why have the ConditionalRejectingErrorHandler as well ? and will thorwing AmqpRejectAndDontRequeueException work

     <int-amqp:inbound-channel-adapter id="amqpInRequestPatternValuation"  channel="requestAmqpIn" channel-transacted="true"  transaction-manager="transactionManager"
         queue-names="requestQueue" error-channel="patternValuationErrorChannel" connection-factory="connectionFactory"
         receive-timeout="59000" concurrent-consumers="1"
         advice-chain="retryChain" error-handler="customErrorHandler" />

 <bean id="customErrorHandler" class="org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler">
         <constructor-arg ref="customFatalExceptionStrategy"/>
  </bean>                             


<bean id="customFatalExceptionStrategy" class="abc.common.CustomFatalExceptionStrategy"/>



  <!-- ADVICE CHAIN FOR CONTROLLING NUMBER OF RE-TRIES before sending to DLQ (or discarding if no DLQ) without this any re-queued fatal message will retry forever  -->

  <util:list id="retryChain">
      <bean class="org.springframework.amqp.rabbit.retry.MissingMessageIdAdvice">
          <constructor-arg>
              <bean class="org.springframework.retry.policy.MapRetryContextCache" />
          </constructor-arg>
      </bean>
      <ref bean="retryInterceptor" />
  </util:list>

<bean id="retryInterceptor"
    class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="retryOperations" ref="retryTemplate" />
    <property name="messageRecoverer" ref="messageRecoverer"/>
</bean>

 <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
   <property name="retryPolicy" ref="simpleRetryPolicy" />
   <property name="backOffPolicy">
          <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                 <property name="initialInterval" value="10000" />
          </bean>
   </property>
</bean>

  <bean id="simpleRetryPolicy" class="org.springframework.retry.policy.SimpleRetryPolicy">
    <property name="maxAttempts" value="3" />
</bean>
2
Yes, and that is difficult to handle your topic. Maybe it would be better to sever it to several SO questions? - Artem Bilan
However each question is tightly related, so I would be grateful of any suggestions or explanations. - Pete
I have simplified my post to just one question, and will post another regarding the separate points - Pete

2 Answers

0
votes

You have to use RejectAndDontRequeueRecoverer to stop re-delivery in the end of retries:

 * MessageRecover that causes the listener container to reject
 * the message without requeuing. This enables failed messages
 * to be sent to a Dead Letter Exchange/Queue, if the broker is
 * so configured.

Yes, the messageId is important for that retry use-case.

You can inject custom MessageKeyGenerator strategy to determine a unique key from message if you can't supply it manually during sending.

0
votes

I never got around to posting the solution, so here it is.

Once I had configured the retry advice chain to the AMQP inbound channel adapter which must include a messageRecoverer of RejectAndDontRequeueRecoverer (which I believe is also the dafault). The important point I was missing was to ensure that when sender's send a message they include a message_id. So if publishing via the RabbitMQ Admin Console I needed to include the predefined message_id property and supply a value.

Using the 'MissingMessageIdAdvice' doesn't help (so I removed) as it will generate a different message_id on each re-deliver on the message leading to the re-try count not incrementing as each delivery was considered different from the last