1
votes

We're trying to develop a self-invoking lambda to process S3 files in chunks. The lambda role has the policies needed for the invocation attached.

Here's the code for the self-invoking lambda:


export const processFileHandler: Handler = async (
  event: S3CreateEvent,
  context: Context,
  callback: Callback,
) => {
  let bucket = loGet(event, 'Records[0].s3.bucket.name');
  let key = loGet(event, 'Records[0].s3.object.key');
  let totalFileSize = loGet(event, 'Records[0].s3.object.size');
  const lastPosition = loGet(event, 'position', 0);
  const nextRange = getNextSizeRange(lastPosition, totalFileSize);

  context.callbackWaitsForEmptyEventLoop = false;

  let data = await loadDataFromS3ByRange(bucket, key, nextRange);


  await database.connect();

  log.debug(`Successfully connected to the database`);

  const docs = await getParsedDocs(data, lastPosition);
  log.debug(`upserting ${docs.length} records to database`);
  if (docs.length) {
    try {
         // upserting logic
      log.debug(`total documents added: ${await docs.length}`);
    } catch (err) {
      await recurse(nextRange.end, event, context);
      log.debug(`error inserting docs: ${JSON.stringify(err)}`);
    }
  }

  if (nextRange.end < totalFileSize) {
    log.debug(`Last ${context.getRemainingTimeInMillis()} milliseconds left`);
    if (context.getRemainingTimeInMillis() < 10 * 10 * 10 * 6) {
      log.debug(`Less than 6000 milliseconds left`);
      log.debug(`Invoking next iteration`);
      await recurse(nextRange.end, event, context);
      callback(null, {
        message: `Lambda timed out processing file, please continue from LAST_POSITION: ${nextRange.start}`,
      });
    }

  } else {
    callback(null, { message: `Successfully completed the chunk processing task` });
  }
};

Where recurse is an invocation call to the same lambda. Rest of the things work as expected it just times out whenever the call stack comes on this invocation request:

const recurse = async (position: number, event: S3CreateEvent, context: Context) => {
  let newEvent = Object.assign(event, { position });

  let request = {
    FunctionName: context.invokedFunctionArn,
    InvocationType: 'Event',
    Payload: JSON.stringify(newEvent),
  };

  let resp = await lambda.invoke(request).promise();
  console.log('Invocation complete', resp);

  return resp;
};

This is the stack trace logged to CloudWatch:

{
    "errorMessage": "connect ETIMEDOUT 63.32.72.196:443",
    "errorType": "NetworkingError",
    "stackTrace": [
        "Object._errnoException (util.js:1022:11)",
        "_exceptionWithHostPort (util.js:1044:20)",
        "TCPConnectWrap.afterConnect [as oncomplete] (net.js:1198:14)"
    ]
}
1

1 Answers

0
votes

Not a good idea to create a self-invoking lambda function. In case of an error (could also be a bad handler call on AWS side) a lambda function might re-run several times. Very hard to monitor and debug.

I would suggest using Step Functions. I believe this tutorial can help Iterating a Loop Using Lambda

From the top of my head, if you prefer not dealing with Step Functions, you could create a Lambda trigger for an SQS queue. Then you pass a message to the queue if you want to run the lambda function another time.