11
votes

Python 2.7.12

boto3==1.3.1

How can I add a step to a running EMR cluster and have the cluster terminated after the step is complete, regardless of it fails or succeeds?

Create the cluster

response = client.run_job_flow(
    Name=name,
    LogUri='s3://mybucket/emr/',
    ReleaseLabel='emr-5.9.0',
    Instances={
        'MasterInstanceType': instance_type,
        'SlaveInstanceType': instance_type,
        'InstanceCount': instance_count,
        'KeepJobFlowAliveWhenNoSteps': True,
        'Ec2KeyName': 'KeyPair',
        'EmrManagedSlaveSecurityGroup': 'sg-1234',
        'EmrManagedMasterSecurityGroup': 'sg-1234',
        'Ec2SubnetId': 'subnet-1q234',
    },
    Applications=[
        {'Name': 'Spark'},
        {'Name': 'Hadoop'}
    ],
    BootstrapActions=[
        {
            'Name': 'Install Python packages',
            'ScriptBootstrapAction': {
                'Path': 's3://mybucket/code/spark/bootstrap_spark_cluster.sh'
            }
        }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole',
    Configurations=[
        {
            'Classification': 'spark',
            'Properties': {
                'maximizeResourceAllocation': 'true'
            }
        },
    ],
)

Add a step

response = client.add_job_flow_steps(
    JobFlowId=cluster_id,
    Steps=[
        {
            'Name': 'Run Step',
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                'Args': [
                    'spark-submit',
                    '--deploy-mode', 'cluster',
                    '--py-files',
                    's3://mybucket/code/spark/spark_udfs.py',
                    's3://mybucket/code/spark/{}'.format(spark_script),
                    '--some-arg'
                ],
                'Jar': 'command-runner.jar'
            }
        }
    ]
)

This successfully adds a step and runs, however, when the step completes successfully, I would like the cluster to auto-terminate as noted in the AWS CLI: http://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html

3

3 Answers

7
votes

In your case (creating the cluster using boto3) you can add these flags 'TerminationProtected': False, 'AutoTerminate': True, to your cluster creation. In this way after your step finished to run the cluster will be shut-down.

Another solution is to add another step to kill the cluster immediately after the step that you want to run. So basically you need to run this command as step

aws emr terminate-clusters --cluster-ids your_cluster_id

The tricky part is to retrive the cluster_id. Here you can find some solution: Does an EMR master node know it's cluster id?

0
votes

The 'AutoTerminate': True parameter as suggested did not work for me. However, it worked when I set the parameter 'KeepJobFlowAliveWhenNoSteps' from True to False. Your Code should look then as the following:

response = client.run_job_flow(
    Name=name,
    LogUri='s3://mybucket/emr/',
    ReleaseLabel='emr-5.9.0',
    Instances={
        'MasterInstanceType': instance_type,
        'SlaveInstanceType': instance_type,
        'InstanceCount': instance_count,
        'KeepJobFlowAliveWhenNoSteps': False,
        'Ec2KeyName': 'KeyPair',
        'EmrManagedSlaveSecurityGroup': 'sg-1234',
        'EmrManagedMasterSecurityGroup': 'sg-1234',
        'Ec2SubnetId': 'subnet-1q234',
    },
    Applications=[
        {'Name': 'Spark'},
        {'Name': 'Hadoop'}
    ],
    BootstrapActions=[
        {
            'Name': 'Install Python packages',
            'ScriptBootstrapAction': {
                'Path': 's3://mybucket/code/spark/bootstrap_spark_cluster.sh'
            }
        }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole',
    Configurations=[
        {
            'Classification': 'spark',
            'Properties': {
                'maximizeResourceAllocation': 'true'
            }
        },
    ],
)
0
votes

You can create a short-lived cluster that automatically terminates after all steps have been run by specifying 'KeepJobFlowAliveWhenNoSteps': False in the Instances param. I've added a complete example to GitHub that shows how to do this.

Here's some of the code from the demo:

def run_job_flow(
        name, log_uri, keep_alive, applications, job_flow_role, service_role,
        security_groups, steps, emr_client):
    try:
        response = emr_client.run_job_flow(
            Name=name,
            LogUri=log_uri,
            ReleaseLabel='emr-5.30.1',
            Instances={
                'MasterInstanceType': 'm5.xlarge',
                'SlaveInstanceType': 'm5.xlarge',
                'InstanceCount': 3,
                'KeepJobFlowAliveWhenNoSteps': keep_alive,
                'EmrManagedMasterSecurityGroup': security_groups['manager'].id,
                'EmrManagedSlaveSecurityGroup': security_groups['worker'].id,
            },
            Steps=[{
                'Name': step['name'],
                'ActionOnFailure': 'CONTINUE',
                'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': ['spark-submit', '--deploy-mode', 'cluster',
                             step['script_uri'], *step['script_args']]
                }
            } for step in steps],
            Applications=[{
                'Name': app
            } for app in applications],
            JobFlowRole=job_flow_role.name,
            ServiceRole=service_role.name,
            EbsRootVolumeSize=10,
            VisibleToAllUsers=True
        )
        cluster_id = response['JobFlowId']
        logger.info("Created cluster %s.", cluster_id)
    except ClientError:
        logger.exception("Couldn't create cluster.")
        raise
    else:
        return cluster_id

And here's some code that calls this function with some real params:

output_prefix = 'pi-calc-output'
pi_step = {
    'name': 'estimate-pi-step',
    'script_uri': f's3://{bucket_name}/{script_key}',
    'script_args':
        ['--partitions', '3', '--output_uri',
        f's3://{bucket_name}/{output_prefix}']
}
cluster_id = emr_basics.run_job_flow(
    f'{prefix}-cluster', f's3://{bucket_name}/logs',
    False, ['Hadoop', 'Hive', 'Spark'], job_flow_role, service_role,
    security_groups, [pi_step], emr_client)