0
votes

I've built an AWS Lambda deployment package (using Node.js) that is executed when an object is PUT into a specified S3 bucket. I've configured the code that gets executed to, for now, add 10 randomly generated records into a Kinesis Firehose stream.

The Lambda function works just fine except for adding the records into the Kinesis stream. I'm seeing no error messages in the AWS CloudWatch logs. Using console.log() print statements, it seems that the putRecord() call isn't even executing and I can't figure out why. Can anyone else figure this out?

Here's the code that's apart of my Lambda function deployment package:

console.log('Loading function');

var aws = require('aws-sdk');
var s3 = new aws.S3({ apiVersion: '2006-03-01' });
var zlib = require('zlib');

function _writeToKinesis(kinesis) {
    var currTime = new Date().getMilliseconds();
    var sensor = 'sensor-' + Math.floor(Math.random() * 100000);
    var reading = Math.floor(Math.random() * 1000000);

    var record = JSON.stringify({
      time : currTime,
      sensor : sensor,
      reading : reading
    });

    console.log("record: " + record);

    var recordParams = {
      Data : record,
      PartitionKey : sensor,
      StreamName : "my_firehose"
    };

    console.log("recordParams: " + recordParams);

    kinesis.putRecord(recordParams, function(err, data) {
      console.log("test");
      if (err) {
        console.log(err);
      }
      else {
        console.log('Successfully sent data to Kinesis.');
      }
    });
}

exports.handler = function(event, context) {
    //console.log('Received event:', JSON.stringify(event, null, 2));

    // Get the object from the event and show its content type
    var record = event.Records[0];
    var bucket = record.s3.bucket.name;
    var key = record.s3.object.key;
    var params = {
        Bucket: bucket,
        Key: key
    };

    s3.getObject(params, function(err, data) {
        if (err) {
            console.log(err);
            var message = "Error getting object " + key + " from bucket " + bucket +
                ". Make sure they exist and your bucket is in the same region as this function.";
            console.log(message);
            context.fail(message);
        } else {

            console.log('CONTENT TYPE:', data.ContentType);

            var kinesis = new aws.Kinesis({ apiVersion: '2013-12-02', region : "us-east-1"});

            var count = 0;
            while (count < 10) {
              setTimeout(_writeToKinesis(kinesis), 1000);
              count++;
            }

            context.succeed("OK");            
        }
    });
};

And here's the CloudWatch Log output:

START RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 Version: $LATEST
2015-10-18T20:13:59.743Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    CONTENT TYPE: application/zip
2015-10-18T20:13:59.861Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":860,"sensor":"sensor-12149","reading":146264}
2015-10-18T20:13:59.861Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:13:59.980Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":924,"sensor":"sensor-86345","reading":956735}
2015-10-18T20:13:59.980Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:13:59.982Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":982,"sensor":"sensor-4925","reading":822265}
2015-10-18T20:13:59.982Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.060Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":60,"sensor":"sensor-40822","reading":796150}
2015-10-18T20:14:00.060Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.061Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":61,"sensor":"sensor-92861","reading":855213}
2015-10-18T20:14:00.061Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.063Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":63,"sensor":"sensor-84324","reading":155159}
2015-10-18T20:14:00.063Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.121Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":121,"sensor":"sensor-54930","reading":365471}
2015-10-18T20:14:00.121Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.122Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":122,"sensor":"sensor-1330","reading":981637}
2015-10-18T20:14:00.122Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.123Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":123,"sensor":"sensor-92245","reading":634723}
2015-10-18T20:14:00.123Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.161Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":161,"sensor":"sensor-29594","reading":227706}
2015-10-18T20:14:00.161Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
END RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666
REPORT RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666  Duration: 1258.07 ms    Billed Duration: 1300 ms    Memory Size: 128 MB Max Memory Used: 15 MB

"OK"

P.S. I have an IAM Role associated with the Lambda function that is configured with policies to read from S3 as well as write to Kinesis.

1
I'm noticing that every couple of executions (not all of them), the log output is different and contains the following error: [ResourceNotFoundException: Stream my_firehose under account 8*********** not found.] I'm not sure why - the stream is created and is ACTIVE...littleK

1 Answers

0
votes

I figured out the problem when I tried performing a listStreams(). It was only printing out Kinesis streams, not Firehose Streams. I had assumed that, in the API, Firehose was under the Kinesis umbrella. Firehose is its own separate API, however.

Also, I ran into another problem that has a workaround posted here: Running AWS Firehose in lambda.js gives an undefined error. For now, if you're using the Firehose API with Lambda, you need to include the aws-sdk modules in your Lambda function deployment package (npm install aws-sdk). There is apparently a ticket created for Amazon to fix this.