My Airflow script has only one task to trigger a glue job. I am able to create the DAG. Below is my code for DAG.
from airflow import DAG
from airflow.operators.email_operator import EmailOperator
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from datetime import datetime, timedelta
### glue job specific variables
glue_job_name = "my_glue_job"
glue_iam_role = "AWSGlueServiceRole"
region_name = "us-west-2"
email_recipient = "[email protected]"
default_args = {
'owner': 'me',
'start_date': datetime(2020, 1, 1),
'retry_delay': timedelta(minutes=5),
'email': email_recipient,
'email_on_failure': True
}
with DAG(dag_id = 'glue_af_pipeline', default_args = default_args, schedule_interval = None) as dag:
glue_job_step = AwsGlueJobOperator(
job_name =glue_job_name,
script_location = 's3://my-s3-location',
region_name = region_name,
iam_role_name = glue_iam_role,
script_args=None,
num_of_dpus=10,
task_id = 'glue_job_step',
dag = dag
)
glue_job_step
When I run the DAG it fails and gives the below error:
[2020-10-13 08:27:14,315] {logging_mixin.py:112} INFO - [2020-10-13 08:27:14,315] {glue.py:114} ERROR - Failed to run aws glue job, error: Parameter validation failed: Invalid type for parameter Arguments, value: [], type: <class 'list'>, valid types: <class 'dict'> [2020-10-13 08:27:14,315] {taskinstance.py:1058} ERROR - Parameter validation failed: Invalid type for parameter Arguments, value: [], type: <class 'list'>, valid types: <class 'dict'> Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/operators/glue.py", line 115, in execute glue_job_run = glue_job.initialize_job(self.script_args) File "/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 111, in initialize_job job_run = glue_client.start_job_run(JobName=job_name, Arguments=script_arguments) File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 337, in _api_call return self._make_api_call(operation_name, kwargs) File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 628, in _make_api_call request_dict = self._convert_to_request_dict( File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 676, in _convert_to_request_dict request_dict = self._serializer.serialize_to_request( File "/usr/local/lib/python3.8/site-packages/botocore/validate.py", line 297, in serialize_to_request raise ParamValidationError(report=report.generate_report()) botocore.exceptions.ParamValidationError: Parameter validation failed: Invalid type for parameter Arguments, value: [], type: <class 'list'>, valid types: <class 'dict'> [2020-10-13 08:27:14,316] {taskinstance.py:1089} INFO - Marking task as FAILED.
Any suggestions appreciated.