1
votes

My Airflow script has only one task to trigger a glue job. I am able to create the DAG. Below is my code for DAG.

from airflow import DAG
from airflow.operators.email_operator import EmailOperator
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from datetime import datetime, timedelta


### glue job specific variables
glue_job_name = "my_glue_job"
glue_iam_role = "AWSGlueServiceRole"
region_name = "us-west-2"
email_recipient = "[email protected]"

default_args = {
    'owner': 'me',
    'start_date': datetime(2020, 1, 1),
    'retry_delay': timedelta(minutes=5),
    'email': email_recipient,
    'email_on_failure': True
}


with DAG(dag_id = 'glue_af_pipeline', default_args = default_args, schedule_interval = None) as dag:
    
    glue_job_step = AwsGlueJobOperator(
        job_name =glue_job_name,
        script_location = 's3://my-s3-location',
        region_name = region_name,
        iam_role_name = glue_iam_role,
        script_args=None,
        num_of_dpus=10,
        task_id = 'glue_job_step',
        dag = dag
        )
   
    glue_job_step

When I run the DAG it fails and gives the below error:

[2020-10-13 08:27:14,315] {logging_mixin.py:112} INFO - [2020-10-13 08:27:14,315] {glue.py:114} ERROR - Failed to run aws glue job, error: Parameter validation failed: Invalid type for parameter Arguments, value: [], type: <class 'list'>, valid types: <class 'dict'> [2020-10-13 08:27:14,315] {taskinstance.py:1058} ERROR - Parameter validation failed: Invalid type for parameter Arguments, value: [], type: <class 'list'>, valid types: <class 'dict'> Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/operators/glue.py", line 115, in execute glue_job_run = glue_job.initialize_job(self.script_args) File "/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 111, in initialize_job job_run = glue_client.start_job_run(JobName=job_name, Arguments=script_arguments) File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 337, in _api_call return self._make_api_call(operation_name, kwargs) File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 628, in _make_api_call request_dict = self._convert_to_request_dict( File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 676, in _convert_to_request_dict request_dict = self._serializer.serialize_to_request( File "/usr/local/lib/python3.8/site-packages/botocore/validate.py", line 297, in serialize_to_request raise ParamValidationError(report=report.generate_report()) botocore.exceptions.ParamValidationError: Parameter validation failed: Invalid type for parameter Arguments, value: [], type: <class 'list'>, valid types: <class 'dict'> [2020-10-13 08:27:14,316] {taskinstance.py:1089} INFO - Marking task as FAILED.

Any suggestions appreciated.

1

1 Answers

0
votes

If you are running a existing GlueJob try this,

glue_job_step = AwsGlueJobOperator(
        task_id = "glue_job_step",
        job_name = glue_job_name,
        job_desc = f"triggering glue job {glue_job_name}",
        region_name = region_name,
        iam_role_name = glue_iam_role,
        num_of_dpus = 1,
        dag = dag
        )

Remove script_args if there are no input arguments.