1
votes

I am trying to use mosquitto (inside a VM sudo apt-get install mosquitto) to publish/subscribe MQTT messages using node.js and this mqtt.js async library: https://github.com/mqttjs/async-mqtt

After installing the mosquitto CLI publisher/subscriber clients on my local PC with sudo apt-get install mosquitto-clients I know they work because I can successfully monitor the publisher/subscriber sessions with these commands:

  • subscriber: mosquitto_sub -h ${MY_VM_IP_ADDRESS} -p 1883 -t "readings" -v -d
  • publisher: mosquitto_pub -h ${MY_VM_IP_ADDRESS} -p 1883 -t "readings" -i foo001 -m '{"deviceId":"foo001","fooMetric":42.42}' -d

I can see the message going from the publisher to the subscriber, however when starting to send a message with Node.js then I can not see it anymore in the subscriber CLI session.

  • What's wrong with the following javascript code?
  • How can I identify a publisher with an ID using Mqtt.js?
  • How can the subscriber filter by this ID?

I am assuming multiple publishers with different IDs can send messages to the same topic and multiple subscribers can filter by IDs from the same topic. I think this is possible, but maybe part of the reason the following code does not work is that I need to take proper care of the combination ID/topic?

This is a mocha spec that I execute to try to send a reading

const MQTT = require("async-mqtt");

const consoleLogError = (err) => {
  if (err.response) {
    console.error(`${new Date().toISOString()} HTTP response error, ${err.response.status}: ${err.response.statusText}`);
  } else {
    console.error(`${new Date().toISOString()} No HTTP error, the stack: ${new Error(err).stack}`);
  }
};

const consoleLog = (msg) => {
  console.log(`${new Date().toISOString()} ${msg}`);
};

// {"fooMetric":42.42, "created_at":"2018-12-24T10:42:08.057Z"}
const generateReadingMsg = () => {
  const now = new Date();
  const msg = {
    "fooMetric": 42.42,
    "created_at": now.toISOString()
  };
  consoleLog(`New generated reading: ${JSON.stringify(msg)}`);
  return msg;
};

const mqttSession = {};

mqttSession.asyncInit = (hostPort, deviceId, mqttTopic) => {
  return new Promise((resolve, reject) => {
    mqttSession.mqttTopic = mqttTopic;
    mqttSession.client = MQTT.connect(`mqtts://${hostPort}`, {
      keepalive: 10,
      clientId: deviceId,
      protocolId: 'MQTT',
      clean: false,
      protocolVersion: 4,
      reconnectPeriod: 1000,
      connectTimeout: 30 * 1000,
      rejectUnauthorized: false,
    });
    return resolve();
  });
};

mqttSession._send = (msgStr) => {
  return Promise.resolve()
    .then(() => {
      return mqttSession.client.publish(mqttSession.mqttTopic, msgStr);
    })
    .then(() => {
      return mqttSession.client.end();
    })
    .catch((err) => {
      consoleLogError(err);
      throw err;
    });
}

mqttSession.asyncSend = (msgJson) => {
  const msgStr = JSON.stringify(msgJson);
  return Promise.resolve()
    .then(() => {
      mqttSession.client.on("connect", () => {
        return mqttSession._send(msgStr);
      });
    })
    .catch((err) => {
      consoleLogError(err);
      throw err;
    });
};

describe.only('MQTT readings', () => {

  // for the IP address check the VM details
  const vm_ip = "xxx.xxx.xxx.xxx";

  beforeEach(() => {
    return Promise.all([
        mqttSession.asyncInit(`${vm_ip}:1883`, "fooId", "readings")
      ]);
  });

  it('should send a reading to the MQTT broker', () => {
    console.log(`TODO run "mosquitto_sub -h ${vm_ip} -p 1883 -t "readings" -v -d"`);
    console.log(`The following MQTT-send should be equivalent to: "mosquitto_pub -h ${vm_ip} -p 1883 -t "readings" -i foo001 -m '{"deviceId":"foo001","fooMetric":42.42}' -d"`)
    return mqttSession.asyncSend(generateReadingMsg())
      .then(stuff => {
        console.log(`returned stuff from the MQTT session: ${stuff}`);
        return Promise.resolve();
      })
      .catch(error => {
        consoleLogError(error);
        throw error;
      });
  });

});
1

1 Answers

1
votes

First up, you can not identify which client published a given message on a topic at the MQTT * protocol level. That information just doesn't exist in any of the protocol level information. If you need that information you need to include it in the payload of the message you send and filter messages after they have been delivered.

As for the code, you are trying to connect to a secure MQTT broker using mqtts://

mqttSession.client = MQTT.connect(`mqtts://${hostPort}`, {

Where as unless you have specifically configured Mosquitto in your VM it will be running normal unsecured MQTT on port 1883

If you delete the s the code runs fine against my broker.

mqttSession.client = MQTT.connect(`mqtt://${hostPort}`, {

* This MQTT v3.x, with the new MQTT v5.0 spec there is the option to add extra meta data, but again you would not be able to filter at subscription time, only once the message had been delivered.