0
votes

Thanks in advance for your time and response.

I have an AWS state machine with the following activities.

  1. Pull first available data file from an external FTP server
  2. Process the data (processing time can vary)
  3. Upload the processed data to another FTP server

I have a java application running in an EC2 instance which has 3 threads and polls the activities using code as shown below. The java application invokes appropriate workers to do the actual work for steps #1,2 and 3. The important point here is that all the 3 activities here should happen in the same server as the steps write and read from a file location in the server.

I have hundreds of files to process in the FTP server and so I have 5 Ec2 servers running copies of the java application.

Now I start 5 executions of the State machine. This would allow me the distribute the file processing across the 5 servers.

However, my problem is this:

How can I ensure that Activities from a given State machine execution are handled by the SAME EC2 instance server.

I don't want a given Execution's activities to be handled by different EC2 instances. In the code below (from https://github.com/goosefraba/aws-step-function-activity-example/blob/master/src/main/java/at/goosefraba/ActivityProcessor.java), I don't see any way to getActivityTask belonging to a particular execution.

  final ClientConfiguration clientConfiguration = new ClientConfiguration();
    clientConfiguration.setSocketTimeout((int) TimeUnit.SECONDS.toMillis(70));

    final AWSStepFunctions client = AWSStepFunctionsClientBuilder
            .standard()
            .withClientConfiguration(clientConfiguration)
            .build();

    while (true) {
        GetActivityTaskResult getActivityTaskResult =
                client.getActivityTask(
                        new GetActivityTaskRequest().withActivityArn(getArn()));
        if (getActivityTaskResult.getTaskToken() != null) {
                // Do work
        }
    }
1

1 Answers

0
votes

Consulted with AWS technical support but didn't get the answer. Finally solved the problem by using the following approach; hopefully this is useful for others.

  1. Start the State machine execution by supplying a unique JobId (which could be the name of the file from the FTPServer, assuming its unique).

    { "JobId": "MyUniqueJobId", }

The java code is:

        final AWSStepFunctions client = AWSStepFunctionsClientBuilder
                .standard()
                .build();
        StartExecutionRequest startExecutionRequest = new StartExecutionRequest();
        // StartReportGenerator execution
        startExecutionRequest.setStateMachineArn("arn:aws:states:us-east-1:xxxxxxx:stateMachine:poc");
        String uuid = UUID.randomUUID().toString();
        startExecutionRequest.setName(uuid);
        String inputJson = "{\"JobId\":\"MyUniqueJobId\"}";
        startExecutionRequest.setInput(inputJson);
        client.startExecution(startExecutionRequest);
  1. Upon retrieval of the activity, retrieve the json and check if its equal to the one I set when starting the State machine.If its equal, then I have picked up the activity belonging to the execution that I started. Otherwise, send a failure to the Activity with an error "Retry"

    GetActivityTaskResult getActivityTaskResult =
       client.getActivityTask(
                    new GetActivityTaskRequest().withActivityArn("activityARN));
    
    if (getActivityTaskResult.getTaskToken() != null) {
        log.info("Kicking off {} acitivity ...", getProcessName());
        String errorMessage = "";
        final JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
        String jobId = json.get("JobId").textValue();
        if (!jobId.equals("**MyUniqueJobId**")){
            log.error("Looking to retrieve JobId MyUniqueJobId, but found " + jobId + " instead; retrying");
            errorMessage = "Retry";
            client.sendTaskFailure(
                    new SendTaskFailureRequest()
                            .withError(errorMessage)
                            .withTaskToken(getActivityTaskResult.getTaskToken()));
        }
    }
    
  2. The state machine json is constructed such that, if the activity fails with an error of "Retry", then try the same activity again.

{
  "Comment": "State machine",
  "StartAt": "RunActivityOne",
  "States": {
    "RunActivityOne": {
      "Type": "Task",
      "TimeoutSeconds": 600,
      "ResultPath": "$.Result",
      "Resource": "arn:aws:states:us-east-1:xxxxxxx:activity:ActivityOne",
      "Catch": [
        {
          "ErrorEquals": [
            "Retry"
          ],
          "ResultPath": "$.Result",
          "Next": "RunActivityOne"
        },
        {
          "ErrorEquals": [
            "States.TaskFailed",
            "States.Timeout"
          ],
          "Next": "RunActivityOneFailure"
        }
      ],
      "Next": "RunActivityTwo"
    },
    "RunActivityOneFailure": {
      "Type": "Fail",
      "Cause": "RunActivityOneFailure",
      "Error": "RunActivityOneFailure"
    },
    "RunActivityTwo": {
      "Type": "Task",
      "TimeoutSeconds": 600,
      "ResultPath": "$.Result",
      "Resource": "arn:aws:states:us-east-1:xxxxxxx:activity:ActivityTwo",
      "Catch": [
        {
          "ErrorEquals": [
            "Retry"
          ],
          "ResultPath": "$.Result",
          "Next": "RunActivityTwo"
        },
        {
          "ErrorEquals": [
            "States.TaskFailed",
            "States.Timeout"
          ],
          "Next": "RunActivityTwoFailure"
        }
      ],
      "End": true
    },
    "RunActivityTwoFailure": {
      "Type": "Fail",
      "Cause": "RunActivityTwoFailure",
      "Error": "RunActivityTwoFailure"
    }
  }
}

This way, I will ultimately process only the activity that belongs to the execution that I started. The cons of this approach are:

  • We cannot say how many attempts it will take to eventually pick the right activity.
  • AWS charges based on the number of transitions