0
votes

I'm new in RabbitMQ and I want to modify a message before consuming it by a queue. I have an exchange which should stay untouchable. The client receives messages with a specific routing key. But there a lot of them and I want to filter and change body before publishing them into queue.

Exchange populate messages looks like:

{
"_context_domain": "unsuitable",
"_msg_id": "1",
"_context_quota_class": null, 
"_context_read_only": false,
"_context_request_id": "1"
}

{
"_context_domain": "suitable",
"_msg_id": "2",
"_context_quota_class": null, 
"_context_read_only": false,
"_context_request_id": "2"
}

Is there any way to filter and modify them before consuming? For example:

...
channel.queueBind(QUEUE_NAME, "EXCHANGE_NAME", "ROUTNG_KEY");
final Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              String message = new String(body, "UTF-8");
              Gson mapper = new Gson();
              SomeObject object = (SomeObject) mapper.fromJson(message, SomeObject.class);
                     if (SomeObject.getContext_domain = "suitable"){
                           //publish somehow SomeObject.getMsg_id into QUEUE_NAME 
}
}

Is there any way to do it?

1
Hi! I'm not sure I understand what you want. Do you mean you want messages on a queue to be filtered and, depending on a criteria, be requeued on another queue?Jean-Sébastien Pédron
Sorry for not clear question. As far as I saw there are only a way to consume ALL messages from exchange by queue. I didn't find how can I change and filter a message before publishing them to queue. Is that even possible? I don't want message to be requeued, I want it to be filtered and changed when queue consume message form exchange. I really hope you understand. I'm really sorry if question is unclearSerhiienko Oleksii
Don't be sorry, that's ok :-) I understand now and will try to answer your question.Jean-Sébastien Pédron

1 Answers

3
votes

Exchanges supported by AMQP do not allow filtering on the message body. You could use "topic" or "header" exchanges which can route messages based on the routing key or a message header.

However none of them allows you to modify the message itself. If you want to do that, you need to develop your own RabbitMQ plugin implementing an exchange.

If you want to go that road, it is quite easy to do. However, exchanges are not exactly designed for this purpose: they are simply routing tables. If you do that, the channel process in RabbitMQ will consume resources to do whatever modifications you want (ie. an exchange is not a process, it's a line in a table basically). That also means other clients on the same connection may be blocked until the channel finished to modify and queue a message.

A more common way to achieve what you want is to use a consumer to handle all messages, do whatever filtering/modifications you need there and queue the result in a second queue. This second queue would be consumed by your normal workers.