0
votes

I am receiving multiple data records at a time as a JSON array from an IoT device in my channel. The received message looks like this :

[
    {
      "Field1": "Value1",
      "Field2": "Value2",
      "Field3": "Value3"
    },
    {
      "Field1": "AnotherValue1",
      "Field2": "AnotherValue2",
      "Field3": "AnotherValue3"
    }
]

I create a dataset using the following SQL query :

SELECT * FROM mydatastore

When I run the data set, the result returned is :

array                                              __dt 
-----                                              -----
[{field1=Value1, field2=Value2, field3=Value3}]    2019-02-21 00:00:00.000

My desired result is :

Field1           Field2           Field3
------           ------           ------
Value1           Value2           Value3
AnotherValue1    AnotherValue2    AnotherValue3

How can I get IoT Analytics to create a new row in the datastore for each element within the received JSON array?

2

2 Answers

0
votes

How can I get IoT Analytics to create a new row in the datastore for each element within the received JSON array?

The simplest way should be to leverage a Lambda Activity on your Pipeline, and have it parse the single JSON payload into the desired structure. This depends somewhat on the 'raw' structure of the messages sent to the Channel.

So, for instance, we can send data to the Channel via CLI batch-put-message, like so:

aws iotanalytics batch-put-message --channel-name sample_channel --messages '[{"messageId": "message1", "payload": "{\"array\": [{\"Field1\": \"Value1\", \"Field2\": \"Value2\", \"Field3\": \"Value3\"},{\"Field1\": \"AnotherValue1\", \"Field2\": \"AnotherValue2\", \"Field3\": \"AnotherValue3\"}]}"}]'

The Channel would then have a single message structured like this:

{
  "messageId": "message1",
  "payload": {
    "array": [
      {
        "Field1": "Value1",
        "Field2": "Value2",
        "Field3": "Value3"
      },
      {
        "Field1": "AnotherValue1",
        "Field2": "AnotherValue2",
        "Field3": "AnotherValue3"
      }
    ]
  }
}

If your Pipeline has a Lambda Activity, then the message(s) from the Channel will be passed to your Lambda function in the event argument.

I created a simple Lambda function (using Python 3.7) using the AWS Lambda console inline editor, and named it sample_lambda:

import json
import sys
import logging

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)


def lambda_handler(event, context):
    # This can be handy to see the raw structure of the incoming event
    # will log to the matching CloudWatch log:
    # /aws/lambda/<name_of_the_lambda>
    # logger.info("raw event: {}".format(event))

    parsed_rows = []

    # Depending on the batchSize setting of the Lambda Pipeline Activity,
    # you may receive multiple messages in a single event
    for message_payload in event:
        if 'array' in message_payload:
            for row in message_payload['array']:
                parsed = {}
                for key, value in row.items():
                    parsed[key] = value
                parsed_rows.append(parsed)

    return parsed_rows

I added the proper permissions so that the IoT-Analytics could invoke the lambda function via CLI:

aws lambda add-permission --function-name sample_lambda --statement-id statm01 --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction

Reprocessing the Pipeline, the parsed rows are placed in the DataStore; executing the DataSet, I get this net result:

"array","field1","field2","field3","__dt"
,"Value1","Value2","Value3","2019-04-26 00:00:00.000"
,"AnotherValue1","AnotherValue2","AnotherValue3","2019-04-26 00:00:00.000"
0
votes

Late reply:

Just thinking , IoT rule - select statement has lot of inbuilt clause/functions like 'as' capable of transforming input payload . please check , may be work out.

Example - https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-select.html

Incoming payload published on topic 'topic/subtopic': {"color":{"red":255,"green":0,"blue":0}, "temperature":50}

SQL:

SELECT color.red as red_value FROM 'topic/subtopic' Outgoing payload: {"red_value":255}