0
votes

I want to do the following: when a message fails and falls to my dead letter queue, I want to wait 5 minutes and republishes the same message on my queue.

Today, using Spring Cloud Streams and RabbitMQ, I did the following code Based on this documentation:

@Component
public class HandlerDlq {

    private static final Logger LOGGER = LoggerFactory.getLogger(HandlerDlq.class);
    private static final String X_RETRIES_HEADER = "x-retries";
    private static final String X_DELAY_HEADER = "x-delay";
    private static final int NUMBER_OF_RETRIES = 3;
    private static final int DELAY_MS = 300000;
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public HandlerDlq(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @RabbitListener(queues = MessageInputProcessor.DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer  retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = 0;
        }
        if (retriesHeader > NUMBER_OF_RETRIES) {
            LOGGER.warn("Message {} added to failed messages queue", failedMessage);
            this.rabbitTemplate.send(MessageInputProcessor.FAILED, failedMessage);
            throw new ImmediateAcknowledgeAmqpException("Message failed after " + NUMBER_OF_RETRIES + " attempts");
        }
        retriesHeader++;
        headers.put(X_RETRIES_HEADER, retriesHeader);
        headers.put(X_DELAY_HEADER, DELAY_MS * retriesHeader);
        LOGGER.warn("Retrying message, {} attempts", retriesHeader);
        this.rabbitTemplate.send(MessageInputProcessor.DELAY_EXCHANGE, MessageInputProcessor.INPUT_DESTINATION, failedMessage);
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(MessageInputProcessor.DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(MessageInputProcessor.INPUT_DESTINATION)).to(delayExchange()).with(MessageInputProcessor.INPUT_DESTINATION);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(MessageInputProcessor.FAILED);
    }
}

My MessageInputProcessor interface:

public interface MessageInputProcessor {

    String INPUT = "myInput";

    String INPUT_DESTINATION = "myInput.group";

    String DLQ = INPUT_DESTINATION + ".dlq"; //from application.properties file

    String FAILED = INPUT + "-failed";

    String DELAY_EXCHANGE = INPUT_DESTINATION + "-DlqReRouter";

    @Input
    SubscribableChannel storageManagerInput();

    @Input(MessageInputProcessor.FAILED)
    SubscribableChannel storageManagerFailed();
}

And my properties file:

#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.myInput.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.dlq-ttl=3000
spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true


#input
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=group

With this code, I can read from dead letter queue, capture the header but I can't put it back to my queue (the line LOGGER.warn("Retrying message, {} attempts", retriesHeader); only runs once, even if I put a very slow time).

My guess is that the method bindOriginalToDelay is binding the exchange to a new queue, and not mine. However, I didn't find a way to get my queue to bind there instead of creating a new one. But I'm not even sure this is the error.

I've also tried to send to MessageInputProcessor.INPUT instead of MessageInputProcessor.INPUT_DESTINATION, but it didn't work as expected.

Also, unfortunately, I can't update Spring framework due to dependencies on the project...

Could you help me with putting back the failed message on my queue after some time? I really didn't want to put a thread.sleep there...

1

1 Answers

0
votes

With that configuration, myInput.group is bound to the delayed (topic) exchange myInput with routing key #.

You should probably remove spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true because you don't need the main exchange to be delayed.

It will also be bound to your explicit delayed exchange, with key myInput.group.

Everything looks correct to me; you should see the same (single) queue bound to two exchanges:

enter image description here

The myInput.group.dlq is bound to DLX with key myInput.group

You should set a longer TTL and examine the message in the DLQ to see if something stands out.

EDIT

I just copied your code with a 5 second delay and it worked fine for me (with turning off the delay on the main exchange).

Retrying message, 4 attempts

and

added to failed messages queue

Perhaps you thought it was not working because you have a delay on the main exchange too?