3
votes

I am trying to start an AWS emr cluster and submit a step using EmrCreateJobFlowOperator and EmrAddStepsOperator, my both steps succed but the cluster is never launch, not even without step

Both of the steps change to succeed status

Here is my code

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retry_delay': timedelta(minutes=2),
    'start_date': datetime(2019, 1, 1),
    'end_date': datetime(2019, 2, 1),

    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)    
}


step_args = ["spark-submit", '../test.py']

step = [{"Name": "what_you_do-" + time.strftime("%Y%m%d-%H:%M"),
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
                'Args': step_args
            }
        }]



JOB_FLOW_OVERRIDES = {
'Instances': {
    'InstanceGroups': [
        {

            'InstanceRole': 'MASTER',
            'InstanceType': 'm4.large',
            'InstanceCount': 1
        },
        {

            'InstanceRole': 'CORE',
            'InstanceType': 'm4.large',
            'InstanceCount': 2,

        }
    ]},
    'Name':'airflow-monthly_agg_custom',

            'BootstrapActions':[{
            'Name': 'Install',
            'ScriptBootstrapAction': {
                'Path': 's3://dep-buck/bootstrap.sh'
            }
        }],
    'Configurations': [
  {
     "Classification": "spark-env",
     "Configurations": [
       {
         "Classification": "export",
         "Properties": {
            "PYSPARK_PYTHON": "/usr/bin/python3"
          }
       }
    ]
  }
]}

dag = DAG('emr_job_flow_automatic_steps_7',
          default_args=default_args,
          schedule_interval="@daily",
          max_active_runs=1,

          #    schedule_interval='*/1 * * * *',

          catchup=True,

          #         dagrun_timeout=timedelta(seconds=10)
          )

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow2',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag
)

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=step,
    dag=dag
)

cluster_creator.set_downstream(step_adder)

I have tried to search for example or good ocument but there isnt much except function definition on airflow site

for create job flow i have this log repeated several time

enter image description here

for "add step" i have this in log

enter image description here

1

1 Answers

3
votes

The Problem was mainly about the visibilty to users and region, it was starting cluster in the default region so i had to change the properties below

Airflow UI > admin > connection > aws_default > extra

{"region_name": "the region i was watching the ec2 console"}

Airflow UI > admin > connection > emr_default > extra

"VisibleToAllUsers": true,