2
votes

So I have implemented a an email system like the one here : https://cloudonaut.io/integrate-sqs-and-lambda-serverless-architecture-for-asynchronous-workloads/

Flow is as follows

http request to end an email -> api gateway -> HttpRequestLambda -> SQS <-> SQSMessageConsumerLambda (scheduled) -> MessageWorkerLambda (sends email via email service provider)

My SQSMessageConsumerLambda is scheduled to run every minute

I changed the SQS consumer to recursively call itself when the timeout is getting near rather than just ending. Doing this means that SQS queue has a better chance of not piling up with too many messages.

This seems to work great so far, but I have a couple quesitons:

1.if the function timesout, those messages that were read from the queue are probably still within their visibility timeout period, thus invoking the lambda recursively means that they cant actually be re-read from the queue until their visibilty timeout expires which is probably not likely to be the case immediately after the recursive call. So would it be an idea to pass these messages into the recursive call itself? and then somehow check for these 'passed in messages' at the beginning of the consumer lambda and send them directly to workers in that case ?

2.SQSMessageConsumerLambda is still a bit of a bottleneck isn't it? as it takes about 40-50 ms to invoke the MessageWorkerLambda for each message it wants to delegate. Or, does the 'async.parallel' mitigate this ?

3.Would it be better if we could somehow elastically increase the number of SQSMessageConsumerLambda based on some CloudWatch alarms , i.e. alarms that check if there are more than X amount of messages on the queue for X minutes ?

var AWS = require('aws-sdk');

var sqs = new AWS.SQS();

var async = require("async");

var lambda = new AWS.Lambda();

var QUEUE_URL = `https://sqs.${process.env.REGION}.amazonaws.com/${process.env.ACCOUNT_ID}/${process.env.STAGE}-emailtaskqueue`;

var EMAIL_WORKER = `${process.env.SERVICE}-${process.env.STAGE}-emailWorker`


var THIS_LAMBDA = `${process.env.SERVICE}-${process.env.STAGE}-emailTaskConsumer`

function receiveMessages(callback) {

    var numMessagesToRead = 10;

    //console.log('in receiveMessages, about to read ',numMessagesToRead);
    //WaitTimeSeconds : The duration (in seconds) for which the call waits for a message to arrive in the queue before returning
    var params = {
        QueueUrl: QUEUE_URL,
        MaxNumberOfMessages: numMessagesToRead,
        WaitTimeSeconds: 20
    };
    sqs.receiveMessage(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);
            callback(err);
        } else {
            if (data.Messages && data.Messages.length > 0) {
                console.log('Got ',data.Messages.length, ' messages off the queue' );
            }else{
                console.log('Got no messages from queue');
            }
            callback(null, data.Messages);
        }
    });
}


function invokeWorkerLambda(task, callback) {

    console.log('Need to invoke worker for this task..',task);

    //task.Body is a json string
    var payload =  {
        "ReceiptHandle" : task.ReceiptHandle,
        "body" : JSON.parse(task.Body)
    };

    console.log('payload:',payload);

    //using 'Event' means use async (http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property)
    //TODO need variable here
    var params = {
        FunctionName: EMAIL_WORKER,
        InvocationType: 'Event',
        Payload: JSON.stringify(payload)
    };
    var millis = Date.now();
    lambda.invoke(params, function(err, data) {
        millis =  Date.now() - millis;
        console.log('took ', millis, ' to invoke ', EMAIL_WORKER, ' asynchronously');
        if (err) {
            console.error(err, err.stack);
            callback(err);
        } else {
            callback(null, data)
        }
    });

}

function handleSQSMessages(context, callback) {
    //console.log('in handleSQSMessages');
    receiveMessages(function(err, messages) {
        if (messages && messages.length > 0) {
            var invocations = [];
            messages.forEach(function(message) {
                invocations.push(function(callback) {
                    invokeWorkerLambda(message, callback)
                });
            });
            async.parallel(invocations, function(err) {
                if (err) {
                    console.error(err, err.stack);
                    callback(err);
                } else {
                    if (context.getRemainingTimeInMillis() > 20000) {
                        console.log('there is more time to read more messages for this run of the cron')
                        handleSQSMessages(context, callback);
                    } else {



                        console.log('remaining time in millis:',context.getRemainingTimeInMillis(),' No more time here, invoking this lambda again')

                        lambda.invoke({FunctionName: THIS_LAMBDA, InvocationType: 'Event',Payload: '{"recursiveMarker":true}' }, function(err, data) {

                            if (err) {
                                console.error(err, err.stack);
                                callback(err);
                            } else {
                                console.log('data from the invocation:', data);
                                callback(null, 'Lambda was just called recursively');
                            }
                        });


                    }
                }
            });
        } else {
            callback(null, "DONE");
        }
    });
}

module.exports.emailTaskConsumer = (event, context, callback) => {


    console.log('in an emailTaskConsumer. Was this a recursive call ?', event);
    handleSQSMessages(context, callback);

}
2

2 Answers

1
votes

1) The visibility timeout is a great feature of SQS allowing you to build resilient systems. Could not find a reason to try to handle failures on your own.

2) You could batch all messages read from the queue to the Worker Lambda at process them at once.

3) You could add additional CloudWatch event rules triggering the Consumer Lambda to increase the read througput.

0
votes

Use SNS to trigger the Lambda. This is the correct way of working with Lambda functions. Your HttpRequestLambda would fire a SNS notification and another Lambda function is immediately triggered to response to that event. Actually, if you are not doing nothing else in HttpRequestLambda, you can also replace it with AWS API proxy. Here you can see full tutorial about exposing the SNS API via API Gateway.