0
votes

I have a Spring Cloud Stream application that receives messages from RabbitMQ using the Rabbit Binder, update my database and send one or many messages. My application can be summarized as this demo app:

The problem is that it doesn't seem that @Transactional works(or at least that's my impression) since if there's an exception the Database is rollbacked but messages are sent even the consumer/producer are configured by default as transacted.

Given that what I want to achieve is when an exception occurs I want the consumed messages go to DLQ after being retried the Database is rolled back and messages are not sent.

How can I achieve this?

This is the output of the demo application when I send a message my-input exchange

2021-01-19 14:31:20.804 ERROR 59593 --- [nput.my-group-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking MyListener#process[1 args]; nested exception is java.lang.RuntimeException: MyError, failedMessage=GenericMessage [payload=byte[4], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=my-input, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=my-input.my-group, amqp_redelivered=false, id=006f733f-5eab-9119-347a-625570383c47, amqp_consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, sourceData=(Body:'[B@177259f3(byte[4])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=my-input, receivedRoutingKey=#, deliveryTag=2, consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, consumerQueue=my-input.my-group]), contentType=application/json, timestamp=1611063077789}]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: MyError
    at com.example.demo.MyListener.process(DemoApplication.kt:46)
    at com.example.demo.MyListener$$FastClassBySpringCGLIB$$4381219a.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at com.example.demo.MyListener$$EnhancerBySpringCGLIB$$f4ed3689.process(<generated>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    ... 29 more

message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto

1

1 Answers

1
votes

Since you are publishing the failed message to the DLQ, from a Rabbit perspective, the transaction was successful and the original message is acknowledged and removed from the queue, and the Rabbit transaction is committed.

You can't do what you want with republishToDlq.

It will work if you use the normal DLQ mechanism (republishToDlq=false, whereby the broker sends the original message to the DLQ) instead of republishing with the extra metadata.

If you want to republish with metadata, you could manually publish to the DLQ with a non-transactional RabbitTemplate (so the DLQ publish doesn't get rolled back with the other publishes).

EDIT

Here is an example of how to do what you need.

A few things to note:

  1. We have to add an error handler to rethrow the exception.
  2. We have to move retries to the listener container instead of the binder; otherwise, the retries will occur within the transaction and if retries are successful, multiple messages would be deposited on the output queue.
  3. For stateful retry to work, we must be able to uniquely identify each message; the simplest solution is to have the sender set a unique message_id property (e.g. a UUID).
@SpringBootApplication
@EnableBinding(Processor.class)
public class So65792643Application {

    public static void main(String[] args) {
        SpringApplication.run(So65792643Application.class, args);
    }

    @Autowired
    Processor processor;

    @StreamListener(Processor.INPUT)
    public void in(Message<String> in) {
        System.out.println(in.getPayload());
        processor.output().send(new GenericMessage<>(in.getPayload().toUpperCase()));
        int attempt = RetrySynchronizationManager.getContext().getRetryCount();
        if (in.getPayload().equals("okAfterRetry") && attempt == 1) {
            System.out.println("success");
        }
        else {
            throw new RuntimeException();
        }
    }

    @Bean
    RepublishMessageRecoverer repub(RabbitTemplate template) {
        RepublishMessageRecoverer repub =
                new RepublishMessageRecoverer(template, "DLX", "rk");
        return repub;
    }

    @Bean
    Queue dlq() {
        return new Queue("my-output.dlq");
    }

    @Bean
    DirectExchange dlx() {
        return new DirectExchange("DLX");
    }

    @Bean
    Binding dlqBinding() {
        return BindingBuilder.bind(dlq()).to(dlx()).with("rk");
    }

    @ServiceActivator(inputChannel = "my-input.group1.errors")
    void errorHandler(ErrorMessage message) {
        MessagingException mex = (MessagingException) message.getPayload();
        throw mex;
    }

    @RabbitListener(queues = "my-output.dlq")
    void dlqListen(Message<String> in) {
        System.out.println("DLQ:" + in);
    }

    @RabbitListener(queues = "my-output.group2")
    void outListen(String in) {
        if (in.equals("OKAFTERRETRY")) {
            System.out.println(in);
        }
        else {
            System.out.println("Should not see this:" + in);
        }
    }

    /*
     * We must move retries from the binder to stateful retries in the container so that
     * each retry is rolled back, to avoid multiple publishes to output.
     * See max-attempts: 1 in the yaml.
     * In order for stateful retry to work, inbound messages must have a unique message_id
     * property.
     */
    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer> customizer(RepublishMessageRecoverer repub) {
        return (container, destinationName, group) -> {
            if ("group1".equals(group)) {
                container.setAdviceChain(RetryInterceptorBuilder.stateful()
                        .backOffOptions(1000, 2.0, 10000)
                        .maxAttempts(2)
                        .recoverer(recoverer(repub))
                        .keyGenerator(args -> {
                            // or generate a unique key some other way
                            return ((org.springframework.amqp.core.Message) args[1]).getMessageProperties()
                                    .getMessageId();
                        })
                        .build());
            }
        };
    }

    private MethodInvocationRecoverer<?> recoverer(RepublishMessageRecoverer repub) {
        return (args, cause) -> {
            repub.recover(((ListenerExecutionFailedException) cause).getFailedMessage(), cause);
            throw new AmqpRejectAndDontRequeueException(cause);
        };
    }

}
spring:
  cloud:
    stream:
      rabbit:
        default:
          producer:
            transacted: true
          consumer:
            transacted: true
            requeue-rejected: true
      bindings:
        input:
          destination: my-input
          group: group1
          consumer:
            max-attempts: 1
        output:
          destination: my-output
          producer:
            required-groups: group2
okAfterRetry
2021-01-20 12:45:24.385  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
okAfterRetry
success
OKAFTERRETRY

notOkAfterRetry
2021-01-20 12:45:39.336  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
notOkAfterRetry
2021-01-20 12:45:39.339  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
DLQ:GenericMessage [payload=notOkAfterRetry, ..., x-exception-message...