0
votes

I am creating a simple chat app where

  1. There can be multiple users and multiple chat rooms.
  2. Each chat room can hold multiple users.
  3. A rabbitMQ connection is established when the server starts.
  4. When a user connects to the server, a socket connection is opened in a rabbitMQ channel.
  5. On sending a message through socket from the client (user), rabbitmq channel pushes it to the exchange using a specific routing key.
  6. On consuming, if the socket is active then send through it & acknowledge it. If not don't acknowledge it and re queue it.

{

//Creating a new connection
amqp.connect('amqp://localhost', (err, conn) => {

conn.createChannel(function(err, ch) {

ch.assertExchange(exchangeName, 'direct', { durable: false });
const routingKey = // some routing key;    

wss.on('connection', (ws, req) => {
  // Creating a new queue for the user
  ch.assertQueue('', { exclusive: true, persist: true, durable: true }, (err, q) => {
    // Binds with the routing key
    ch.bindQueue(q.queue, exchangeName, routingKey);

    ch.consume(q.queue, (msg) => {
      if (ws.readyState === 1) {
        ch.ack(msg);
        ws.send(` [-] Received ${msg.content.toString()}`);
      } else {
        // What will come here so that it will be requeued to a new queue with the same routing id?
      }
    }, { noAck: false });
  });

  ws.on('message', (message) => {
    ch.publish(exchangeName, routingKey, new Buffer(message));
    console.log(`[x] Sent ${message} to the exchange with routingKey ${routingKey}`);
  });
}) 

This works correctly when all the users are in socket connected with the server. What I want to implement is when the socket connection dies for an user and if he misses a message based on the routing key (specific to the user), he should be able to receive those messages again whenever he reconnects back with a different queue. I feel this is possible since the messages can be kept in exchange if it is not acknowledged in any queue. But not sure how to implement it.

2

2 Answers

1
votes

Let me address several misconceptions ...

What I want to implement is when the socket connection dies for an user and if he misses a message based on the routing key (specific to the user), he should be able to receive those messages again whenever he reconnects back with a different queue

You are using exclusive queues. By definition, these queues are deleted when their associated connection and channel goes away. The messages in those queues are lost when the queue is deleted.

I feel this is possible since the messages can be kept in exchange if it is not acknowledged in any queue.

That's not how RabbitMQ works. Exchanges are used to route messages, and queues are what store them.

The following comment is incorrect:

If user1 and user2 on different host, clearly they should establish different channels. 'channel' is just something like connection, if two channels consume same queue, they can receive same message.

Being on different hosts and probably running in different processes, user1 and user2 will each establish their own connections and channels. Two consumers will never receive the same message from the same queue.

Getting back to your original problem, you need to declare your queues as not-exclusive and durable, and name them in such a way that when your application re-connections the app connects to the same queue. That way, the app can consume the messages that remain on the queue.

There are various ways to have RabbitMQ delete unused queues for you that are documented.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

0
votes

Consumer should consume message with Acknowledgement, If a message is delivered to a consumer and not acknowledged before the consumer connection dropped, then RabbitMQ will redeliver it. You can get more detail from RabbitMQ doc.

In your situation, when the socket connection broken, you should force close the channel, then the RabbitMQ will know these message should redeliver.

To ensure that messages do survive when server restarts, set exchange 'durable': ch.assertExchange(exchangeName, 'direct', { durable: false });