Currently I'm using airflow with celery-executor+redis to run dags, and I have set execution_timeout to be 12 hours in a S3 key sensor, but it will fail in one hour in each retry
I have tried to update visibility_timeout = 64800 in airflow.cfg but the issue still exist
file_sensor = CorrectedS3KeySensor(
task_id = 'listen_for_file_drop', dag = dag,
aws_conn_id = 'aws_default',
poke_interval = 15,
timeout = 64800, # 18 hours
bucket_name = EnvironmentConfigs.S3_SFTP_BUCKET_NAME,
bucket_key = dag_config[ConfigurationConstants.FILE_S3_PATTERN],
wildcard_match = True,
execution_timeout = timedelta(hours=12)
)
For my understanding, execution_timeout should work that it will last for 12 hours after total four times run (retry = 3). But the issue is for each retry, it will fail in an hour and it only last total 4 hours+
[2019-08-06 13:00:08,597] {{base_task_runner.py:101}} INFO - Job 9: Subtask listen_for_file_drop [2019-08-06 13:00:08,595] {{timeout.py:41}} ERROR - Process timed out
[2019-08-06 13:00:08,612] {{models.py:1788}} ERROR - Timeout Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1652, in _run_raw_task result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/sensors/base_sensor_operator.py", line 97, in execute while not self.poke(context):
File "/usr/local/airflow/dags/ProcessingStage/sensors/sensors.py", line 91, in poke time.sleep(30)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 42, in handle_timeout raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout