1
votes

I am learning the AWS services for a use case. After going through the docs I came came up with the a simple flow. I want to ingest data into the Kinesis streams by using the Streams API and the KPL. I use the example putRecord method to ingest data to the streams. I am ingesting the this JSON to the stream -

{"userid":1234,"username":"jDoe","firstname":"John","lastname":"Doe"}

Once the data is ingested i get the following response in putRecordResult -

Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318638512162631666140828401666}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318645765717549353915876638722}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318649392495008197803400757250}

Now I write a Lambda function to get these data and push into a DynamoDB table. Here is my Lambda function -

console.log('Loading function');
var AWS = require('aws-sdk');
var tableName = "sampleTable";
var doc = require('dynamodb-doc');
var db = new doc.DynamoDB();

exports.handler = (event, context, callback) => {
    //console.log('Received event:', JSON.stringify(event, null, 2));
    event.Records.forEach((record) => {
        // Kinesis data is base64 encoded so decode here
        const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded payload:', payload);
        var userid = event.userid;
        var username = event.username;
        var firstname = event.firstname;
        console.log(userid + "," + username +","+ firstname);

        var item = {
            "userid" : userid,
            "username" : username,
            "firstname" : firstname
        };

        var params = {
            TableName : tableName,
            Item : item
        };
        console.log(params);

        db.putItem(params, function(err, data){
            if(err) console.log(err);
            else console.log(data);
        });

    });
    callback(null, `Successfully processed ${event.Records.length} records.`);
};

Somehow I am not able to see the console.logs in the lambda functions execution. I see in the streams page there have been putRecord to the stream and get as well but somehow i can see nothing in the Lambdafunction page nor in the DynamoDB table.

I have an IAM policy for the Java code for the ingestion of the data into Kinesis, another for the Lambda function that is lambda-kinesis-execution-role and a policy for the DynamoDB to ingest data into the tables.

Is there any tutorial that shows how it is done in the right way? I am getting a feeling that I am missing many points in this process for example how to link all those IAM policies and make them in sync so that when the data is put into the stream it is processed by Lambda and ends up in Dynamo?

Any pointers and help is deeply appreciated.

2
Is your Lambda function being called at all? You don't mention, so I'm wondering if you've enabled the AWS Lambda Event that passes data from Kinesis to your function: docs.aws.amazon.com/lambda/latest/dg/…devonlazarus
Thank you for the comment. Yes I have added the Kinesis stream int he Event Sources tab of the lambda function and it shows State-enabled and Details as - Batch size: 100, Last result: OK. When I configure Test Event using Kinesis sample event template and Test it gives me errors stating Item: {id: undefined, username:undefined, firstname:undefined}Dan
if you're code above is a direct copy of the code you're using, you're referencing event.userid but you should be using payload.userid. You've decoded the Kinesis record into the payload variable.devonlazarus
yes you are correct. Just found out that i need to use cloudwatch to see the output of the console.logs. I had been receiving data to lambda all the time. Just had to see this in Cloudwatch logs :)Dan

2 Answers

1
votes

If you're code above is a direct copy of the code you're using, you're referencing event.userid but you should be using payload.userid. You've decoded the Kinesis record into the payload variable.

0
votes

You Can use Lambda function

1.Create IAM role for both Kinesis and Dynamodb

2.Now Create a Lambda function from blue print of dynamodb-process-stream

3.Select the execution role which we created from IAM

4.Click Create Function Now Go to Edit code section and write the following code

const AWS =require('aws-sdk');
const docClient =new AWS.DynamoDB.DocumentClient({region : 'us-east-1'});

exports.handler = (event, context, callback) => {
   event.Records.forEach((record) => { 

    var params={
    Item :{
            ROWTIME:Date.now().toString(),//Dynamodb column name
   DATA:new Buffer(record.kinesis.data, base64').toString('ascii')//Dynamodb column name
              },
    TableName:'mytable'//Dynamodb Table Name
    };

docClient.put(params,function(err,data){
    if(err){
        callback(err,null);
    }
    else{
        callback(null,data);
    }
});
});
};