0
votes

The Issue:

I have a node.js (8.10) AWS Lambda function that takes a json object and publishes it to an IOT topic. The function successfully publishes to the topic, however, once fired it is continuously called until I throttle the concurrency to zero to halt any further calling of the function.

I'm trying to figure out what I've implemented incorrectly that causes more than one instance the of the function to be called.

The Function:

Here is my function:

var AWS = require('aws-sdk');

exports.handler = function (event, context) {
    var iotdata = new AWS.IotData({endpoint: 'xxxxxxxxxx.iot.us-east-1.amazonaws.com'});
    var params = {
        topic: '/PiDevTest/SyncDevice',
        payload: JSON.stringify(event),
        qos: 0
    };

    iotdata.publish(params, function(err, data) {
        if (err) {
          console.log(err, err.stack);
        } else {
            console.log("Message sent.");
            context.succeed();
        }     
    });
};

My test json is:

{
  "success": 1,
  "TccvID": "TestID01"
}

The test console has a response of "null", but the IOT topic shows the data from the test json, published to the topic about once per second.

What I've Tried

-I've attempted to define the handler in it's own, non-anonymous function called handler, and then having the exports.handler = handler; This didn't produce any errors, but didn't successfully post to the iot topic either.

-I thought maybe the issues was with the node.js callback. I've tried implementing it and leaving it out (Current iteration above), but neither way seemed to make a difference. I had read somewhere that the function will retry if it errors, but I believe that only happens three times so it wouldn't explain the indefinite calling of the function.

-I've also tried calling the function from another lambda to make sure that the issue wasn't the aws test tool. This produced the same behavior, though.

Summary:

What am I doing incorrectly that causes this function to publish the json data indefinitely to the iot topic?

Thanks in advance for your time and expertise.

2
How is the Lambda function called/triggered exactly? Are you sure the Lambda function isn't setup to be triggered by new IoT events, thus triggering itself every time it publishes to that topic?Mark B
Right now it's only called from another Lambda or the aws Test feature. I'm not subscribing to the topic, only publishing to it, so I wouldn't expect it to be triggered by new data posted to the topic. I can test that though.Ryan Gibbs
So good thought, it does appear in a manually publish that same json to the topic, it invokes the lambda. Which tells me that I've got a lambda somewhere that is listening in on that topic indiscriminately and then calling this second lambda.Ryan Gibbs
@MarkB You got me there, the solution was related to another lambda who was listening to the same topic and invoking the lambda I was working on. If you want to post it as an answer, I'll accept it.Ryan Gibbs

2 Answers

0
votes

Use aws-iot-device-sdk to create a MQTT client and use it's messageHandler and publish method to publish your messages to IOT topic. Sample MQTT client code is below,

import * as DeviceSdk from 'aws-iot-device-sdk';
import * as AWS from 'aws-sdk';

let instance: any = null;

export default class IoTClient {

  client: any;
  /**
   * Constructor
   *
   * @params {boolean} createNewClient - Whether or not to use existing client instance
   */
  constructor(createNewClient = false, options = {}) {

  }

  async init(createNewClient, options) {
    if (createNewClient && instance) {
      instance.disconnect();
      instance = null;
    }

    if (instance) {
      return instance;
    }
    instance = this;
    this.initClient(options);
    this.attachDebugHandlers();
  }

  /**
   * Instantiate AWS IoT device object
   * Note that the credentials must be initialized with empty strings;
   * When we successfully authenticate to the Cognito Identity Pool,
   * the credentials will be dynamically updated.
   *
   * @params {Object} options - Options to pass to DeviceSdk
   */
  initClient(options) {
    const clientId = getUniqueId();

    this.client = DeviceSdk.device({
      region: options.region || getConfig('iotRegion'),

      // AWS IoT Host endpoint
      host: options.host || getConfig('iotHost'),

      // clientId created earlier
      clientId: options.clientId || clientId,

      // Connect via secure WebSocket
      protocol: options.protocol || getConfig('iotProtocol'),

      // Set the maximum reconnect time to 500ms; this is a browser application
      // so we don't want to leave the user waiting too long for reconnection after
      // re-connecting to the network/re-opening their laptop/etc...
      baseReconnectTimeMs: options.baseReconnectTimeMs || 500,
      maximumReconnectTimeMs: options.maximumReconnectTimeMs || 1000,

      // Enable console debugging information
      debug: (typeof options.debug === 'undefined') ? true : options.debug,

      // AWS access key ID, secret key and session token must be
      // initialized with empty strings
      accessKeyId: options.accessKeyId,
      secretKey: options.secretKey,
      sessionToken: options.sessionToken,
      // Let redux handle subscriptions
      autoResubscribe: (typeof options.debug === 'undefined') ? false : options.autoResubscribe,
    });
  }

  disconnect() {
    this.client.end();
  }

  attachDebugHandlers() {
    this.client.on('reconnect', () => {
      logger.info('reconnect');
    });

    this.client.on('offline', () => {
      logger.info('offline');
    });

    this.client.on('error', (err) => {
      logger.info('iot client error', err);
    });

    this.client.on('message', (topic, message) => {
      logger.info('new message', topic, JSON.parse(message.toString()));
    });
  }

  updateWebSocketCredentials(accessKeyId, secretAccessKey, sessionToken) {
    this.client.updateWebSocketCredentials(accessKeyId, secretAccessKey, sessionToken);
  }

  attachMessageHandler(onNewMessageHandler) {
    this.client.on('message', onNewMessageHandler);
  }

  attachConnectHandler(onConnectHandler) {
    this.client.on('connect', (connack) => {
      logger.info('connected', connack);
      onConnectHandler(connack);
    });
  }

  attachCloseHandler(onCloseHandler) {
    this.client.on('close', (err) => {
      logger.info('close', err);
      onCloseHandler(err);
    });
  }

  publish(topic, message) {
    this.client.publish(topic, message);
  }

  subscribe(topic) {
    this.client.subscribe(topic);
  }

  unsubscribe(topic) {
    this.client.unsubscribe(topic);
    logger.info('unsubscribed from topic', topic);
  }
}

***getConfig() is to get environment variables from a yml file or else you can directly specify it here.

0
votes

While he only posted it as an comment, MarkB pointed me in the correct direction.

The problem was the solution was related to another lambda who was listening to the same topic and invoking the lambda I was working on. This resulted in circular logic as the exit condition was never met. Fixing that code solved this issue.