0
votes

I need to synchronize a base and a local client with MQTT. If client publishes then the other one will get the message.

  • If my MQTT broker is down, I need to stop sending messages, save the messages somewhere, wait for a connection, then continue sending.

  • If my local or base client is down for a second, I need to save the message which I didn't send, then send it when I turn on my base/local.

I'm working with Node.js and can't figure out how to implement this.

This is my handler when I connect or disconnect with my MQTT server.

    client.on('connect',()=>{
      store.state = true;
      run(store).then((value)=>console.log('stop run'));
    });

    client.on('offline',()=>{
      store.state = false;
      console.log('offline');
    });

This is my run function. I use store.state to decide if I should stop this interval. But this code does not seem to be a good way to implement my concept.

function run(store) {
  return new Promise((resolve,reject)=>{
    let interval = setInterval(()=>{
      if (!store.state) {
        clearInterval(interval);
        resolve(true);
      }
      else if (store.queue.length > 0) {
        let data = store.queue.pop();
        let res = client.publish('push',JSON.stringify(data),{qos:2});
      }
    },300)
  });
}

What should I do to implement a function which always sends, stop upon 'disconnect', then continues sending when connected?

1
What are you trying to fix? Simpler code, handling of multiple queued messages on reconnect, code for reconnecting?MBer
@MBer I don't think set interval which 300ms is good. I need a solution look better, more flexible, I can lost my message because I removed it from my queue, what happen if res=client.publish(..) false ?Nam

1 Answers

0
votes

I don't think set interval which 300ms is good.

If you want something that "always runs", at set intervals and in spite of any errors inside the loop, setInterval() makes sense. You are right that queued messages can be cleared faster than "once every 300 ms".

Since MQTT.js has a built-in queue, you could simplify a lot by using it. However, your messages are published to a target called "push", so I guess you want them delivered in the order of the queue. This answer keeps the queue and focuses on sending the next message as soon as the last one is confirmed.

What if res=client.publish(..) false ?

Good point! If you want to make sure it arrives, better to remove it once the publish has succeeded. For this, you need to retrieve the value without removing it, and use the callback argument to find out what happened (publish() is asynchronous). If that was the only change, it might look like:

let data = store.queue[store.queue.length - 1];
client.publish('push', JSON.stringify(data), {qos:2}, (err) => {
  if(!err) {
    store.queue.pop();
  }
  // Ready for next publish; call this function again
});

Extending that to include a callback-based run:

function publishFromQueue(data) {
  return new Promise((resolve,reject)=>{
    let res = client.publish('push', JSON.stringify(data), {qos:2}, (err) => {
      resolve(!err);
    });
  });
}

async function run(store) {
  while (store.queue.length > 0 && store.state) {
    let data = store.queue[store.queue.length - 1];
    let res = await publishFromQueue(data);
    if(res) {
      store.queue.pop();
    }
  }
}

This should deliver all the queued messages in order as soon as possible, without blocking. The only drawback is that it does not run constantly. You have two options:

  1. Recur at set intervals, as you have already done. Slower, though you could set a shorter interval.
  2. Only run() when needed, like:
let isRunning = false; //Global for tracking state of running

function queueMessage(data) {
  store.queue.push(data);
  if(!isRunning) {
    isRunning = true;
    run(store);
  }
  isRunning = false;
}

As long as you can use this instead of pushing to the queue, it should come out similar length, and more immediate and efficient.