3
votes

I would like to achieve following scenario in my application:

  1. If a business error occurs, the message should be send from the incomingQueue to the deadLetter Queue and delayed there for 10 seconds
  2. The step number 1 should be repeated 3 times
  3. The message should be published to the parkingLot Queue

Scenario to achieve

Current RabbitMQ Queues setup

I am able (see the code below) to delay the message for a certain amount of time in a deadLetter Queue. And the message is looped infinitely between the incoming Queue and the deadLetter Queue. So far so good.

The main question: How can I intercept the process and manually route the message (as described in the step 3) to the parkingLot Queue for later further analysis?

A secondary question: Can I achieve the same process with only one exchange?

Here is a shortened version of my two classes:

Configuration class

@Configuration
public class MailRabbitMQConfig {

    @Bean
    TopicExchange incomingExchange() {
       TopicExchange incomingExchange = new TopicExchange(incomingExchangeName);
        return incomingExchange;
    }

    @Bean
    TopicExchange dlExchange() {
        TopicExchange dlExchange = new TopicExchange(deadLetterExchangeName);
        return dlExchange;
    }

    @Bean
    Queue incomingQueue() {

        return QueueBuilder.durable(incomingQueueName)
                .withArgument(
                        "x-dead-letter-exchange",
                        dlExchange().getName()
                )
                .build();
    }

    @Bean
    public Queue parkingLotQueue() {
        return new Queue(parkingLotQueueName);
    }

    @Bean
    Binding incomingBinding() {
        return BindingBuilder
                .bind(incomingQueue())
                .to(incomingExchange())
                .with("#");
    }

    @Bean
    public Queue dlQueue() {
        return QueueBuilder
                .durable(deadLetterQueueName)
                .withArgument("x-message-ttl", 10000)
                .withArgument("x-dead-letter-exchange", incomingExchange()
                    .getName())
                .build();
    }

    @Bean
    Binding dlBinding() {
        return BindingBuilder
                .bind(dlQueue())
                .to(dlExchange())
                .with("#");
    }

    @Bean
    public Binding bindParkingLot(
            Queue parkingLotQueue,
            TopicExchange dlExchange
    ) {

        return BindingBuilder.bind(parkingLotQueue)
                    .to(dlExchange)
                    .with(parkingLotRoutingKeyName);
    }
}

Consumer class

@Component
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Consumer.class);

    @RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
    public Boolean receivedMessage(MailDataExternalTemplate mailDataExternalTemplate) throws Exception {

        try {
            // business logic here
        } catch (Exception e) {
            throw new AmqpRejectAndDontRequeueException("Failed to handle a business logic");
        }

        return Boolean.TRUE;
    }
}

I know I could define an additional listener for a deadLetter Queue in a Consumer class like that:

@RabbitListener(queues = "${mail.rabbitmq.queue.deadletter}")
public void receivedMessageFromDlq(Message failedMessage) throws Exception {
    // Logic to count x-retries header property value and send a failed message manually
    // to the parkingLot Queue
}

However it does not work as expected because this listener is called as soon as the message arrives the head of the deadLetter Queue without to be delayed.

Thank you in advance.


EDIT: I was able with @ArtemBilan and @GaryRussell help to solve the problem. The main solution hints are within their comments in the accepted answer. Thank you guys for the help! Below you will find a new diagram that shows the messaging process and the Configuration and the Consumer classes. The main changes were:

  • The definition of the routes between the incoming exchange -> incoming queue and the dead letter exchange -> dead letter queue in the MailRabbitMQConfig class.
  • The loop handling with the manual publishing of the message to the parking lot queue in the Consumer class

enter image description here

Configuration class

@Configuration
public class MailRabbitMQConfig {
    @Autowired
    public MailConfigurationProperties properties;

    @Bean
    TopicExchange incomingExchange() {
        TopicExchange incomingExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getIncoming());
        return incomingExchange;
    }

    @Bean
    TopicExchange dlExchange() {
        TopicExchange dlExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getDeadletter());
        return dlExchange;
    }

    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(properties.getRabbitMQ().getQueue().getIncoming())
            .withArgument(                 
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
                dlExchange().getName()
            )
            .withArgument(
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
                properties.getRabbitMQ().getRoutingKey().getDeadLetter()
            )
            .build();
    }

    @Bean
    public Queue parkingLotQueue() {
        return new Queue(properties.getRabbitMQ().getQueue().getParkingLot());
    }

    @Bean
    Binding incomingBinding() {
        return BindingBuilder
            .bind(incomingQueue())
            .to(incomingExchange())
            .with(properties.getRabbitMQ().getRoutingKey().getIncoming());
   }

    @Bean
    public Queue dlQueue() {
        return QueueBuilder
            .durable(properties.getRabbitMQ().getQueue().getDeadLetter())
            .withArgument(                      
                properties.getRabbitMQ().getMessages().X_MESSAGE_TTL_HEADER,
                properties.getRabbitMQ().getMessages().getDelayTime()
            )
            .withArgument(
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
                incomingExchange().getName()
            )
            .withArgument(
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
                properties.getRabbitMQ().getRoutingKey().getIncoming()
            )
            .build();
    }

    @Bean
    Binding dlBinding() {
        return BindingBuilder
            .bind(dlQueue())
            .to(dlExchange())
            .with(properties.getRabbitMQ().getRoutingKey().getDeadLetter());
    }

    @Bean
    public Binding bindParkingLot(
        Queue parkingLotQueue,
        TopicExchange dlExchange
    ) {
        return BindingBuilder.bind(parkingLotQueue)
            .to(dlExchange)
            .with(properties.getRabbitMQ().getRoutingKey().getParkingLot());
    }
}

Consumer class

@Component
public class Consumer {
    private final Logger logger = LoggerFactory.getLogger(Consumer.class);

    @Autowired
    public MailConfigurationProperties properties;

    @Autowired
    protected EmailClient mailJetEmailClient;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
    public Boolean receivedMessage(
        @Payload MailDataExternalTemplate mailDataExternalTemplate,
        Message amqpMessage
    ) {
        logger.info("Received message");

        try {
            final EmailTransportWrapper emailTransportWrapper = mailJetEmailClient.convertFrom(mailDataExternalTemplate);

            mailJetEmailClient.sendEmailUsing(emailTransportWrapper);
            logger.info("Successfully sent an E-Mail");
        } catch (Exception e) {
            int count = getXDeathCountFromHeader(amqpMessage);
            logger.debug("x-death count: " + count);

            if (count >= properties.getRabbitMQ().getMessages().getRetryCount()) {
                this.rabbitTemplate.send(
                     properties.getRabbitMQ().getExchange().getDeadletter(),
                     properties.getRabbitMQ().getRoutingKey().getParkingLot(),
                     amqpMessage
                );
                return Boolean.TRUE;
            }

            throw new AmqpRejectAndDontRequeueException("Failed to send an E-Mail");
        }

        return Boolean.TRUE;
    }

    private int getXDeathCountFromHeader(Message message) {
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        if (headers.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER) == null) {
            return 0;
        }

        //noinspection unchecked
        ArrayList<Map<String, ?>> xDeath = (ArrayList<Map<String, ?>>) headers
            .get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER);
        Long count = (Long) xDeath.get(0).get("count");
        return count.intValue();
    }
1

1 Answers

1
votes

To delay message to be available in the queue, you should consider to use DelayedExchange: https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/_reference.html#delayed-message-exchange.

As for manually sending to the parkingLot queue, that's just easy to use RabbitTemplate and send message using its name:

/**
 * Send a message to a default exchange with a specific routing key.
 *
 * @param routingKey the routing key
 * @param message a message to send
 * @throws AmqpException if there is a problem
 */
void send(String routingKey, Message message) throws AmqpException;

All the queues are bound to the default exchange via their names as routing keys.