1
votes

The application uses java 10, spring amqp and rabbitmq.

The system has a dead letter queue where we send some messages (they couldn't be processed as expected because of database unavailability).

For now, database availability is checked every X seconds and, if available only, we re-queue messages to their original queue. Otherwise we do nothing and messages stays in the dead letter queue.

When re-queued to original queue, messages can go back to dead letter queue again and see the x-death header count growing.

For some reasons, we would like to process dead-lettered messages that have count >= 5 (for example) and re-queue others to the dead letter queue.

I need to basic ack the message first to check the x-death count header, then send it to the original queue if count is big enough, else re-queue in dead letter queue.

I can't manage to re-queue to dead letter queue because the basic get in not inside a listener: throwing AmqpRejectAndDontRequeueException doesn't work as the exception is not thrown inside a rabbitmq listener object.

I tried throwing the exception inside a receiveAndCallback method, but this seems not better:

rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {

        @Override
        public Object handle(Message message) {
            Long messageXdeathCount = null;
            if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
                List<Map<String, ?>> xdeathHeader =
                        (List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
                        "x-death");
                if (null != xdeathHeader && null != xdeathHeader.get(0)) {
                    messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
                }
            }
            if (messageXdeathCount == null) {
                messageXdeathCount = 0L;
            }
            if (messageXdeathCount >= 5) {
                resendsMessage(message);
            } else {
                // this does not reject the message
                throw new AmqpRejectAndDontRequeueException("rejected");
            }
            return null;
        }
    });
    return receive;
}

After this method execution, the message is not rejected as I expect and is away from the queue (it has been acked).

Here is the exchange and queue declaration:

@Bean
public Exchange exchange() {
    TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
    admin().declareExchange(exchange);
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", EXCHANGE);
    Queue queue = new Queue("queueName", true, false, false, args);
    admin().declareQueue(queue);
    Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
    admin().declareBinding(binding);
    return exchange;
}

How can reject the messages in the dead letter queue without using the AmqpRejectAndDontRequeueException? Is is possible for an exchange to have x-dead-letter-exchange set to self?

Thanks for your help

UPDATE

I tried another way, with channel get and reject:

// exchange creation
@Bean
public Exchange exchange() throws IOException {
    Connection connection = connectionFactory().createConnection();
    Channel channel = channel();
    channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", EXCHANGE);
    channel.queueDeclare("queueName", true, false, false, args);
    channel.queueBind("queueName", EXCHANGE, routingKey);
    return exchange;
}

Message get and ack or reject:

GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = null;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
    List<Map<String, ?>> xdeathHeader =
            (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
    if(null != xdeathHeader && null != xdeathHeader.get(0)) {
        messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
    }
}
if (messageXdeathCount == null) {
    messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
    MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    MessageProperties messageProps =
            messagePropertiesConverter.toMessageProperties(response.getProps(),
response.getEnvelope(), "UTF-8");
    resendsMessage(new Message(response.getBody(), messageProps));
    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
    if(response.getProps().getHeaders().get("x-death") == null) {
        response.getProps().getHeaders().put("x-death", new ArrayList<>());
    }
    if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
        ((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
    }
    ((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
            "count", messageXdeathCount + 1);
    channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
}

First I realized that it was quite ugly, then that messages cannot be updated between get and rejected. It there a way to use channel.basicReject and update the x-death count header?

2

2 Answers

1
votes

receiveAndReply() methods currently do not provide control over the acknowledging of the received message. Feel free to open a New Feature Request.

You can use a listener container instead to get the flexibility you need.

EDIT

You can drop down to the rabbitmq API...

rabbitTemplate.execute(channel -> {
    // basicGet, basicPublish, ack/nack etc here
});
0
votes

I could use the channel basic methods:

GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = 0L;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
    List<Map<String, ?>> xdeathHeader =
            (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
    if(null != xdeathHeader && null != xdeathHeader.get(0)) {
        for (Map<String, ?> map : xdeathHeader) {
            Long count = (Long) map.get("count");
            messageXdeathCount += count == null ? 0L : count;
        }
    }
}
if (messageXdeathCount >= 5) {
    MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
    resendsMessage(new Message(response.getBody(), messageProps));
    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
    channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
}

the issue in the update part of my question was in last line:

channel.basicGet(queueName, true);

the boolean indicates if the message should be requeued or not: if not requeued, it goes to exchange letter and increments count x-death header, as expected. Boolean updated to false fixed the issue.