1
votes

I will explain what I want to achieve and then what I have done (without results)

I have two node services connected between them with RabbitMQ ussing exchanges (topic):

enter image description here

What I want is shutdown C1 while still sending messages to something.orange.something. Then I want to restart my C1 again and receive all the messages that I have lost.

What happens to me now is that each time I restart my consumer creates a new queue and creates a new binding in my exchange with the same routing key. So I have now two queues receiving the same information.

If I configure my queue with the param {exclusive: true}, I solve part of the problem, I no longer have queues without receivers, but still having the same problem... all messages sent without an active receiver are lost.

it's possible?

Here my code:

sender:

'use strict';

const amqp = require('amqplib/callback_api');
const logatim = require('logatim');
logatim.setLevel('info')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    let ex = 'direct_colors';
    let args = process.argv.slice(2);
    let colors = ['colors.en.green', 'colors.en.yellow', 'colors.es.red']

    ch.assertExchange(ex, 'topic', {durable: true});

    setInterval(() => {
      let color = colors[Math.floor(Math.random() * 3)];
      let msg = `This is a ${color} message`;
      ch.publish(ex, color, new Buffer(msg));

      logatim[color.split('.').pop()].info(msg);
    }, 1000);
  });
});

reveiver:

'use strict';

const amqp = require('amqplib/callback_api');
const logatim = require('logatim');
logatim.setLevel('info');
const args = process.argv.slice(2);

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    var ex = 'direct_colors';

    ch.assertExchange(ex, 'topic', {durable: true});

    ch.assertQueue('', {exclusive: true, durable: true}, (err, q) => {
      logatim.green.info(' [*] Waiting for logs. To exit press CTRL+C');

      args.forEach((arg) => {
        ch.bindQueue(q.queue, ex, arg);
      });

      ch.consume(q.queue, (msg) => {
        logatim[msg.fields.routingKey.split('.').pop()].info(` [x] ${msg.content.toString()}`);
      });
    });
  });
});
1

1 Answers

2
votes

You need named queues. When you declare the queue in your receiver class, give it a well-known name (constant), something like

ch.assertQueue('my_service_1_queue', {durable: true}, ...

the basic examples of them are in RabbitMQ Tutorial

When you consumer will go down and re-start, it will be consuming from the same named queue. NOTE: you don't need the exclusive queue there, as it will be deleted when you consumer goes down.