1
votes

Currently I'm using airflow with celery-executor+redis to run dags, and I have set execution_timeout to be 12 hours in a S3 key sensor, but it will fail in one hour in each retry

I have tried to update visibility_timeout = 64800 in airflow.cfg but the issue still exist

file_sensor = CorrectedS3KeySensor(
    task_id = 'listen_for_file_drop', dag = dag,
    aws_conn_id = 'aws_default', 
    poke_interval = 15,
    timeout = 64800, # 18 hours
    bucket_name = EnvironmentConfigs.S3_SFTP_BUCKET_NAME,
    bucket_key = dag_config[ConfigurationConstants.FILE_S3_PATTERN],
    wildcard_match = True,
    execution_timeout = timedelta(hours=12)
)

For my understanding, execution_timeout should work that it will last for 12 hours after total four times run (retry = 3). But the issue is for each retry, it will fail in an hour and it only last total 4 hours+

[2019-08-06 13:00:08,597] {{base_task_runner.py:101}} INFO - Job 9: Subtask listen_for_file_drop [2019-08-06 13:00:08,595] {{timeout.py:41}} ERROR - Process timed out

[2019-08-06 13:00:08,612] {{models.py:1788}} ERROR - Timeout Traceback (most recent call last):

File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1652, in _run_raw_task result = task_copy.execute(context=context)

File "/usr/local/lib/python3.6/site-packages/airflow/sensors/base_sensor_operator.py", line 97, in execute while not self.poke(context):

File "/usr/local/airflow/dags/ProcessingStage/sensors/sensors.py", line 91, in poke time.sleep(30)

File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 42, in handle_timeout raise AirflowTaskTimeout(self.error_message)

airflow.exceptions.AirflowTaskTimeout: Timeout

1

1 Answers

0
votes

I figure it out a few days before.

Since I'm using AWS to deploy airflow with celery executor, there's a few improper cloudwatch alarm will keep scale up and down the workers and webserver/scheuler :(

After those alarms updated, it works well now!!