1
votes

I want to implement retry functionality (when there is a problem in the consumer ) for 3 times and then the message should go to another queue ( dead letter exchange ). I have configured the queue / exchange as below

regular message exchange name: test_exchange message queue: test_queue test_queue bind to test_exchange with routing key test_queue

dead letter exchange name: test_dlq_exchange dead message queue: test_dlq_queue test_dlq_queue bind to test_dlq_exchange with routing key test_dlq_queue

In the RabbitMQ UI console , I have configured the "x-dead-letter-exchange" as "test_exchange"

Below is the code for SimpleMessageListenerContainer

  @Bean 
    SimpleMessageListenerContainer getMessageListenerContainer(){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("test_queue");

    MessageListenerAdapter adapter = new MessageListenerAdapter();
     //configured my message listener class 
     //configured Jackson2JsonMessageConverter as converter
     container.setMessageListener(adapter);
     container.setAdviceChain(new Advice[] {retryAdvice()}
     return container;
    }

    //configuration for retryAdvice

     @Bean 
     public MethodInterceptor retryAdvice{

        ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy();
        backoffPolicy.setInitialInterval(10);
        backoffPolicy.setMaxInterval(1000);
        backoffPolicy.setMultiplier(2);
        RabbitTemplate retryTemplate = new RabbitTemplate(connectionFactory());
        retryTemplate.setQueue("test_dl_queue");
        return RetryInterceptorBuilder
                    .stateful()
                    .backOffPolicy(backoffPolicy)
                    .maxAttempts(3)
                    .recoverer(
                       new RepublishMessageRecoverer
                       (retryTemplate,"test_dl_exchange","test_queue")).build();
    }  

My custom message listener is Message driven POJO which has just my MessageObject.

Since I am using stateful, I have enabled the createMessageIds(true). In my message listener I am invoking the targetobject's method again. After starting the container, the flow is going cyclic

i.e Publish the message to queue -> Message listener invoke's the target method based on some exception -> again publishing the message to queue -> message listener invoke's target method...etc. It is not pushing the message to dead letter exchange/queue and going infinite loop.

In the log I see as below

      o.s.r.i.StatefulRetryOperationsInterceptor - Executing proxied method in stateful retry public abstract void org.springframework.amqp.rabbitlistener.SimpleMessageListenerContainer$ContainerDelegate.invokeListener(Channel,Message) throws java.lang.Exception(55435ee)

Can some one help me to fix this issue ?

1

1 Answers

0
votes

Since you are using the RepublishMessageRecoverer, configuring a DLX/DLQ on the broker does nothing. In order to route to the DLQ, you need a RejectAndDontRequeueRecover - when the retries are exhausted the message will be sent to the DLX/DLQ.

It looks like you are republishing to the same queue

(retryTemplate,"test_dl_exchange","test_queue"))

Hence the infinite loop.