3
votes

I have an application that writes to a DynamoDB table, and I'm trying to get Kinesis to do my aggregation then write the aggregated data to another DynamoDB table.

Streams are enabled on my DynamoDB table, and I have a Lamdba trigger on the stream as follows:

'use strict';

var AWS = require('aws-sdk');
var kinesis = new AWS.Kinesis();

exports.handler = (event, context, callback) => {
    event.Records.forEach((record) => {

        var myValue = record.dynamodb.NewImage.myValue.N;
        var partitionKey = record.key.S;
        var data = '{"VALUE":"' + myValue + '"}';

        var recordParams = {
            Data: data,
            PartitionKey: partitionKey,
            StreamName: 'MyStreamName'
        };

        console.log('Try Put to Kinesis Stream');

        kinesis.putRecord(recordParams, function(err, data) {
            if (err) {
                console.log('Failed Put');
            } else {
                console.log('Successful Put');
            }
        });
    });
};

This writes successfully to my Kinesis Stream when I have three or four elements in the Lambda test event.

When I enable my trigger it does not write to my Kinesis Stream at all. There appears to be about 100 elements coming in at a time. In Cloudwatch I see the 'Try Put to Kinesis Stream' message, but I don't even see the Success/Failure messages.

Am I doing something completely wrong or there a better approach to this problem?

If DynamoDB's stream could feed straight into Kinesis Analytics that would be my first prize :)

2

2 Answers

4
votes

Your mistake is that your lambda function does not wait until all kinesis.putRecord calls are finished.

In Node.js you have a programming model of callbacks. You make a async request and the callback is called when the request is done. So the request is not finished when the function returns. It is finished when the callback is invoked.

Two solutions to the problem:

Keep track of the called callbacks yourself

'use strict';
var AWS = require('aws-sdk');
var kinesis = new AWS.Kinesis();
exports.handler = (event, context, callback) => {
    event.Records.forEach((record) => {
        var myValue = record.dynamodb.NewImage.myValue.N;
        var partitionKey = record.key.S;
        var data = '{"VALUE":"' + myValue + '"}';
        var recordParams = {
            Data: data,
            PartitionKey: partitionKey,
            StreamName: 'MyStreamName'
        };
        console.log('Try Put to Kinesis Stream');
        var i = 0;
        kinesis.putRecord(recordParams, function(err, data) {
            if (err) {
                console.log('Failed Put');
                i = event.Records.length;
            } else {
                console.log('Successful Put');
                i += 1;
            }
            if (i === event.Records.length) {
                console.log('All done');
                callback(err);
            }
        });
    });
};

or use a library like async: https://www.npmjs.com/package/async

0
votes

It seems to me that part of your overall problem (besides needing to call callback, per hellomichibye) and the behaviour you describe in the comments, could be from how you are building the value for Data. Instead of manually creating a JSON string for Data, try using JSON.stringify so that you know the input will always be properly formatted.