2
votes

I am very new to AWS Step Functions and AWS Lambda Functions and could really use some help. I have a state machine where I am trying to check if a certain file exists in my S3 Bucket then have the state machine follow one path if the file exists and a different path if it does not exist.

The following shows the beginning of my State Machine code the covers this issue

{
  "Comment": "This is a test for running the structure of the CustomCreate job.",
  "StartAt": "PreStep",
  "States": {
    "PreStep": {
      "Comment": "Check that all the necessary files exist before running the job.",
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:XXXXXXXXXX:function:CustomCreate-PreStep-Function",
      "Next": "Run Job Choice"
    },
    "Run Job Choice": {
      "Comment": "This step chooses whether or not to go forward with running the main job.",
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.FoundNecessaryFiles",
          "BooleanEquals": true,
          "Next": "Spin Up Cluster"
        },
        {
          "Variable": "$.FoundNecessaryFiles",
          "BooleanEquals": false,
          "Next": "Do Not Run Job"
        }
      ]
    },
    "Do Not Run Job": {
      "Comment": "This step triggers if the PreStep fails and the job should not run.",
      "Type": "Fail",
      "Cause": "PreStep unsuccessful"
    },
    "Spin Up Cluster": {
      "Comment": "Spins up the EMR Cluster.",
      "Type": "Pass",
      "Next": "Update Env"
    },
    "Update Env": {
      "Comment": "Update the environment variables in the EMR Cluster.",
      "Type": "Pass",
      "Next": "Run Job"
    },
    "Run Job": {
      "Comment": "Add steps to the EMR Cluster.",
      "Type": "Pass",
      "End": true
    }
  }
}

The following code depicts my CustomCreate-PreStep-Function Lambda Function

exports.handler = async function(event, context, callback) {
     var AWS = require('aws-sdk');
     var s3 = new AWS.S3();
     var params = {Bucket: 'BUCKET_NAME', Key: 'FILE_NAME'};
     s3.getObject(params, function(err, data) {

        if (err) {
            console.log(err, err.stack);
            // file does not exist
            console.log("failed");
            callback(null,false);
        }
        else {
            console.log(data);
            //file exist
            console.log("succeeded");
            var FoundNecessaryFiles = true;
            // return FoundNecessaryFiles;
            callback(null,event.FoundNecessaryFiles=true);
        }    
    });
};

I have tried this in a number of ways but have been unable to get it working. As you can see, I am trying to use the Lambda Function to pass back the variable FoundNecessaryFiles with a state of true/false depending on whether the file was found and use that value to guide the choice in the next step. It would be preferable to solve this by fixing the variable pass back as I may need to use that method again later in the state machine, but I would also be willing to accept another solution, whether it be combining the steps or whatever else may work.

Also, my next steps in this process will be to spin up an AWS EMR Cluster provided that the proper files exist, which I am also very unclear on how to accomplish. I would be very appreciative if anyone were able to able to provide any assistance in running an AWS EMR Cluster using Step Functions as well.

2

2 Answers

1
votes

I solved my initial problem of passing the variable, however, I could still really use some help getting an EMR Cluster running through Step Functions.

For those of you who may encounter a similar problem, I solved my variable passing issue by changing my Lambda Function to the following

exports.handler = function(event, context, callback) {
     var AWS = require('aws-sdk');
     var s3 = new AWS.S3();
     var params = {Bucket: 'BUCKET_NAME', Key: 'FILE_NAME'};
     s3.getObject(params, function(err, data) {

        if (err) {
            console.log(err, err.stack);
            // file does not exist
            console.log("failed");
            event.FoundNecessaryFiles = false;
            callback(null,event);
        }
        else {
            console.log(data);
            //file exist
            console.log("succeeded");
            event.FoundNecessaryFiles = true;
            callback(null,event);
        }    
    });
};

My next issue is to set up an AWS EMR Cluster. My current first task is to spin up an EMR Cluster, this could be done through directly using the Step Function JSON, or preferably, using a JSON Cluster Config file I have located on my S3 Bucket. My next task is to update the EMR Cluster environment variables, I have a .sh script located on my S3 Bucket that can do this, I just do not know how to apply this to the EMR Cluster using the Step Functions. My third task is to add a step that contains a spark-submit command to the EMR Cluster, this command is described in a JSON config file on my S3 Bucket that can be uploaded to the EMR Cluster in a similar manner to uploading the environment configs file in the previous step. Finally, I want to have a task that makes sure the EMR Cluster terminates after it completes its run.

Any help on this would be greatly appreciated, whether you follow the structure I outlined above or know of a solution that alters the structure, I would be happy to take any assistance.

0
votes

This might be helpful to someone as new features have been introduced to the AWS step functions.

  1. In order to setup variables or deploy software on the EMR cluster, one should use BootStrap actions. You can provide a S3 path to the bootstrap script. Bootstrap script runs as a part of the cluster creation process.

  2. One can add an EMR step (synchronous or asynchronous) using Step Functions. One can build spark-submit as an Args array. Here is an example:

ARGS_ARRAY = [ "spark-submit",
              "--master=yarn",
              "--deploy-mode=cluster",
              "--name=Generate Bulk File",
              "--driver-cores=1",
              "--driver-memory=2g",
              "--executor-cores=5",
              "--executor-memory=9g",
              "/home/hadoop/charu/mySparkJob.py", 
              input_file_name,
              output_file_name] 
  1. One can add the last step in the workflow as 'Terminate EMR Cluster' dependent upon the EMR step before it i.e. if the previous EMR step is synchronous and finishes successfully, then only the cluster will be terminated. (assuming thats what one wants)