0
votes

I have a RabbitListener in a Spring Boot app that retries and sends messages to the dead letter queue whenever an exception occurs which is fine. However if one particular exception occurs on my message listener I do not want it to be retried and go to my dead letter queue but I still want my transaction to be rolled back. Is there a way to do this?

As stated in my code below I tried catching the Exception but the consumer restarts. If I throw AmqpRejectAndDontRequeueException the message would not be retried but would end up on the dead letter queue

Here is my listener method:

    @RabbitListener(queues = "#{T(com.myproject.RabbitBinding).PROCESS_MESSAGE.getQueue()}")
    public void onMessage(MyMessage message)
    {
        if (log.isDebugEnabled())
        {
            log.debug("Received message: {}", message);
        }

        try
        {
            process.run(message);
        }
        catch (IllegalStateException e)
        {
            log.error("Exception occured: e", e);
        }

Whenever I catch the Exception it outputs the exception but then the consumer restarts because I see this message::

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer killOrRestart

Here is my Rabbit Config:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            Jackson2JsonMessageConverter messageConverter,
            RetryAdviceChainFactory deadLetterRetryAdviceChainFactory,
            PlatformTransactionManager transactionManager)
    {
        final SimpleRabbitListenerContainerFactory cf = new SimpleRabbitListenerContainerFactory();
        cf.setConnectionFactory(connectionFactory);
        cf.setConcurrentConsumers(rabbitConcurrentConsumers);
        cf.setErrorHandler(defaultErrorHandler);
        cf.setAdviceChain(deadLetterRetryAdviceChainFactory.createDefaultRetryChain());
        cf.setChannelTransacted(true);
        cf.setTaskExecutor(taskExecutor);
        cf.setMessageConverter(messageConverter);
        cf.setTransactionManager(transactionManager);
        return cf;
    }
    @Bean
    protected RetryAdviceChainFactory deadLetterRetryAdviceChainFactory(
            MessageRecoverer deadLetterMessageRecoverer)
    {
        RetryAdviceChainFactory factory = new RetryAdviceChainFactory();
        factory.setMessageRecoverer(deadLetterMessageRecoverer);
        return factory;
    }
1

1 Answers

1
votes

Throw ImmediateAcknowledgeAmqpException.