I've setup a docker environment running:
- Airflow Webserver
- Airflow Scheduler
- Flower
- 2 Airflow Workers (though the issue is reproducible with just 1 Worker)
- Redis
Six images total across 4 t2.small EC2 instances in a single ECS cluster with a db.t2.micro postgresql RDS instance.
Using the CeleryExecutor, nearly all queued tasks sent to workers fail. Upon receiving tasks, the workers seem to lose communication with each other and/or the scheduler - they drift apart, miss heartbeats, and eventually are forcefully killed by the host system.
I'm able to reproduce this behavior on Airflow 1.10.3 (and the latest 1.10.4RC) using latest versions of both Redis and RabbitMQ and Celery 4.3.0.
I've padded out suggested configuration options including:
- scheduler__scheduler_heartbeat_sec (currently 180 seconds)
- scheduler__job_heartbeat_sec (currently default 5 seconds)
- scheduler__max_threads (currently just 1 thread)
- celery_broker_transport_options__visibility_timeout (currently 21600 seconds)
Below is a DAG run that runs 5 SQL queries that set permissions across schemas.
- Running these queries manually takes seconds
- LocalExecutor in a non-dockerized environment will run the DAG in ~30 seconds.
- CeleryExecutor in this new docker environment is still trying to run the first try for each task ~300 seconds into the run.
Scheduler:
[2019-07-29 01:20:23,407] {{jobs.py:1106}} INFO - 5 tasks up for execution:
<TaskInstance: ldw_reset_permissions.service_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
[2019-07-29 01:20:23,414] {{jobs.py:1144}} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 5 task instances in queue
[2019-07-29 01:20:23,418] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 0/16 running and queued tasks
[2019-07-29 01:20:23,418] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 1/16 running and queued tasks
[2019-07-29 01:20:23,418] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 2/16 running and queued tasks
[2019-07-29 01:20:23,422] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 3/16 running and queued tasks
[2019-07-29 01:20:23,422] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 4/16 running and queued tasks
[2019-07-29 01:20:23,423] {{jobs.py:1223}} INFO - Setting the follow tasks to queued state:
<TaskInstance: ldw_reset_permissions.service_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
[2019-07-29 01:20:23,440] {{jobs.py:1298}} INFO - Setting the following 5 tasks to queued state:
<TaskInstance: ldw_reset_permissions.service_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
<TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
<TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
<TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
<TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
[2019-07-29 01:20:23,440] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'service_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,444] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'service_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,445] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'marketing_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,446] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'marketing_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,446] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'finance_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,446] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'finance_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,446] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'engineering_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,447] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'engineering_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,447] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'bi_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,447] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'bi_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:21:25,589] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.marketing_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1
[2019-07-29 01:21:25,599] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.engineering_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1
[2019-07-29 01:21:56,111] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.service_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1
[2019-07-29 01:22:28,133] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.bi_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1
Worker 1:
[2019-07-29 01:20:23,593: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[cb066498-e350-43c1-a23d-1bc33929717a]
[2019-07-29 01:20:23,605: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'service_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,627: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[d835c30a-e2bd-4f78-b291-d19b7bccad68]
[2019-07-29 01:20:23,637: INFO/ForkPoolWorker-1] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'finance_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:25,260] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=44
[2019-07-29 01:20:25,263] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=45
[2019-07-29 01:20:25,878] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:25,881] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:26,271] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:26,276] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:26,601] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host b4b0a799a7ca
[2019-07-29 01:20:26,604] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.service_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host b4b0a799a7ca
[2019-07-29 01:20:39,364: INFO/MainProcess] missed heartbeat from celery@0f9db941bdd7
[2019-07-29 01:21:46,121: WARNING/MainProcess] Substantial drift from celery@0f9db941bdd7 may mean clocks are out of sync. Current drift is
70 seconds. [orig: 2019-07-29 01:21:46.117058 recv: 2019-07-29 01:20:36.485961]
[2019-07-29 01:21:46,127: ERROR/MainProcess] Process 'ForkPoolWorker-15' pid:42 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:21:46,294: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:21:49,853: ERROR/MainProcess] Process 'ForkPoolWorker-17' pid:62 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:29,230: ERROR/MainProcess] Process 'ForkPoolWorker-18' pid:63 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:44,002: INFO/MainProcess] missed heartbeat from celery@0f9db941bdd7
[2019-07-29 01:22:52,073: ERROR/MainProcess] Process 'ForkPoolWorker-19' pid:64 exited with 'signal 9 (SIGKILL)'
Worker 2:
[2019-07-29 01:20:23,605: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[dbb9b813-255e-4284-b067-22b990d8b9a2]
[2019-07-29 01:20:23,609: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'marketing_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,616: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[42ee3e3a-620e-47da-add2-e5678973d87e]
[2019-07-29 01:20:23,622: INFO/ForkPoolWorker-1] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'engineering_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,632: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[be609901-60bc-4dcc-9374-7c802171f2db]
[2019-07-29 01:20:23,638: INFO/ForkPoolWorker-3] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'bi_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:26,124] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=45
[2019-07-29 01:20:26,127] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=46
[2019-07-29 01:20:26,135] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=44
[2019-07-29 01:20:27,025] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:27,033] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:27,047] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:27,798] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:27,801] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:27,806] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:28,426] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host 0f9db941bdd7
[2019-07-29 01:20:28,426] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host 0f9db941bdd7
[2019-07-29 01:20:28,437] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host 0f9db941bdd7
[2019-07-29 01:20:56,752: INFO/MainProcess] missed heartbeat from celery@b4b0a799a7ca
[2019-07-29 01:20:56,764: ERROR/MainProcess] Process 'ForkPoolWorker-15' pid:42 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:20:56,903: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:20:57,623: WARNING/MainProcess] Substantial drift from celery@b4b0a799a7ca may mean clocks are out of sync. Current drift is
25 seconds. [orig: 2019-07-29 01:20:57.622959 recv: 2019-07-29 01:20:32.629294]
[2019-07-29 01:20:57,631: ERROR/MainProcess] Process 'ForkPoolWorker-1' pid:24 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:20:57,837: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:20:58,513: ERROR/MainProcess] Process 'ForkPoolWorker-17' pid:65 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,076: INFO/MainProcess] missed heartbeat from celery@b4b0a799a7ca
[2019-07-29 01:22:23,089: ERROR/MainProcess] Process 'ForkPoolWorker-19' pid:67 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,105: ERROR/MainProcess] Process 'ForkPoolWorker-18' pid:66 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,116: ERROR/MainProcess] Process 'ForkPoolWorker-3' pid:26 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,191: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:22:26,758: ERROR/MainProcess] Process 'ForkPoolWorker-22' pid:70 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:26,770: ERROR/MainProcess] Process 'ForkPoolWorker-21' pid:69 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:26,781: ERROR/MainProcess] Process 'ForkPoolWorker-20' pid:68 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:29,988: WARNING/MainProcess] process with pid=65 already exited
[2019-07-29 01:22:29,991: ERROR/MainProcess] Process 'ForkPoolWorker-24' pid:75 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:30,002: ERROR/MainProcess] Process 'ForkPoolWorker-23' pid:71 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:30,017: ERROR/MainProcess] Process 'ForkPoolWorker-16' pid:43 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,202: INFO/MainProcess] missed heartbeat from celery@b4b0a799a7ca
[2019-07-29 01:23:14,206: ERROR/MainProcess] Process 'ForkPoolWorker-28' pid:79 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,221: ERROR/MainProcess] Process 'ForkPoolWorker-27' pid:78 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,231: ERROR/MainProcess] Process 'ForkPoolWorker-26' pid:77 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,242: ERROR/MainProcess] Process 'ForkPoolWorker-25' pid:76 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,252: ERROR/MainProcess] Process 'ForkPoolWorker-14' pid:41 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,503: ERROR/MainProcess] Process 'ForkPoolWorker-33' pid:87 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,572: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:86 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,622: ERROR/MainProcess] Process 'ForkPoolWorker-31' pid:85 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,646: ERROR/MainProcess] Process 'ForkPoolWorker-30' pid:84 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,828: ERROR/MainProcess] Process 'ForkPoolWorker-29' pid:83 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:43,361: WARNING/MainProcess] process with pid=84 already exited
[2019-07-29 01:23:43,723: ERROR/MainProcess] Process 'ForkPoolWorker-38' pid:92 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:44,119: ERROR/MainProcess] Process 'ForkPoolWorker-37' pid:91 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:44,536: ERROR/MainProcess] Process 'ForkPoolWorker-36' pid:90 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:45,203: ERROR/MainProcess] Process 'ForkPoolWorker-35' pid:89 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:45,510: ERROR/MainProcess] Process 'ForkPoolWorker-34' pid:88 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:10,947: WARNING/MainProcess] process with pid=68 already exited
[2019-07-29 01:24:11,579: ERROR/MainProcess] Process 'ForkPoolWorker-43' pid:97 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:12,288: ERROR/MainProcess] Process 'ForkPoolWorker-42' pid:96 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:13,880: ERROR/MainProcess] Process 'ForkPoolWorker-41' pid:95 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:14,775: ERROR/MainProcess] Process 'ForkPoolWorker-40' pid:94 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:15,762: ERROR/MainProcess] Process 'ForkPoolWorker-39' pid:93 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:25:05,623: WARNING/MainProcess] process with pid=75 already exited
Suggestions on what's going on and how to remedy this?
web_server_worker_timeout
in config to a higher value. Maybe 300. – nightgaunt