How can I get IoT Analytics to create a new row in the datastore for each element within the received JSON array?
The simplest way should be to leverage a Lambda Activity on your Pipeline, and have it parse the single JSON payload into the desired structure. This depends somewhat on the 'raw' structure of the messages sent to the Channel.
So, for instance, we can send data to the Channel via CLI batch-put-message, like so:
aws iotanalytics batch-put-message --channel-name sample_channel --messages '[{"messageId": "message1", "payload": "{\"array\": [{\"Field1\": \"Value1\", \"Field2\": \"Value2\", \"Field3\": \"Value3\"},{\"Field1\": \"AnotherValue1\", \"Field2\": \"AnotherValue2\", \"Field3\": \"AnotherValue3\"}]}"}]'
The Channel would then have a single message structured like this:
{
"messageId": "message1",
"payload": {
"array": [
{
"Field1": "Value1",
"Field2": "Value2",
"Field3": "Value3"
},
{
"Field1": "AnotherValue1",
"Field2": "AnotherValue2",
"Field3": "AnotherValue3"
}
]
}
}
If your Pipeline has a Lambda Activity, then the message(s) from the Channel will be passed to your Lambda function in the event
argument.
I created a simple Lambda function (using Python 3.7) using the AWS Lambda console inline editor, and named it sample_lambda
:
import json
import sys
import logging
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
def lambda_handler(event, context):
# This can be handy to see the raw structure of the incoming event
# will log to the matching CloudWatch log:
# /aws/lambda/<name_of_the_lambda>
# logger.info("raw event: {}".format(event))
parsed_rows = []
# Depending on the batchSize setting of the Lambda Pipeline Activity,
# you may receive multiple messages in a single event
for message_payload in event:
if 'array' in message_payload:
for row in message_payload['array']:
parsed = {}
for key, value in row.items():
parsed[key] = value
parsed_rows.append(parsed)
return parsed_rows
I added the proper permissions so that the IoT-Analytics could invoke the lambda function via CLI:
aws lambda add-permission --function-name sample_lambda --statement-id statm01 --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction
Reprocessing the Pipeline, the parsed rows are placed in the DataStore; executing the DataSet, I get this net result:
"array","field1","field2","field3","__dt"
,"Value1","Value2","Value3","2019-04-26 00:00:00.000"
,"AnotherValue1","AnotherValue2","AnotherValue3","2019-04-26 00:00:00.000"