I want to implement retry functionality (when there is a problem in the consumer ) for 3 times and then the message should go to another queue ( dead letter exchange ). I have configured the queue / exchange as below
regular message exchange name: test_exchange message queue: test_queue test_queue bind to test_exchange with routing key test_queue
dead letter exchange name: test_dlq_exchange dead message queue: test_dlq_queue test_dlq_queue bind to test_dlq_exchange with routing key test_dlq_queue
In the RabbitMQ UI console , I have configured the "x-dead-letter-exchange" as "test_exchange"
Below is the code for SimpleMessageListenerContainer
@Bean
SimpleMessageListenerContainer getMessageListenerContainer(){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("test_queue");
MessageListenerAdapter adapter = new MessageListenerAdapter();
//configured my message listener class
//configured Jackson2JsonMessageConverter as converter
container.setMessageListener(adapter);
container.setAdviceChain(new Advice[] {retryAdvice()}
return container;
}
//configuration for retryAdvice
@Bean
public MethodInterceptor retryAdvice{
ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy();
backoffPolicy.setInitialInterval(10);
backoffPolicy.setMaxInterval(1000);
backoffPolicy.setMultiplier(2);
RabbitTemplate retryTemplate = new RabbitTemplate(connectionFactory());
retryTemplate.setQueue("test_dl_queue");
return RetryInterceptorBuilder
.stateful()
.backOffPolicy(backoffPolicy)
.maxAttempts(3)
.recoverer(
new RepublishMessageRecoverer
(retryTemplate,"test_dl_exchange","test_queue")).build();
}
My custom message listener is Message driven POJO which has just my MessageObject.
Since I am using stateful, I have enabled the createMessageIds(true). In my message listener I am invoking the targetobject's method again. After starting the container, the flow is going cyclic
i.e Publish the message to queue -> Message listener invoke's the target method based on some exception -> again publishing the message to queue -> message listener invoke's target method...etc. It is not pushing the message to dead letter exchange/queue and going infinite loop.
In the log I see as below
o.s.r.i.StatefulRetryOperationsInterceptor - Executing proxied method in stateful retry public abstract void org.springframework.amqp.rabbitlistener.SimpleMessageListenerContainer$ContainerDelegate.invokeListener(Channel,Message) throws java.lang.Exception(55435ee)
Can some one help me to fix this issue ?