0
votes

I've setup a docker environment running:

  • Airflow Webserver
  • Airflow Scheduler
  • Flower
  • 2 Airflow Workers (though the issue is reproducible with just 1 Worker)
  • Redis

Six images total across 4 t2.small EC2 instances in a single ECS cluster with a db.t2.micro postgresql RDS instance.

Using the CeleryExecutor, nearly all queued tasks sent to workers fail. Upon receiving tasks, the workers seem to lose communication with each other and/or the scheduler - they drift apart, miss heartbeats, and eventually are forcefully killed by the host system.

I'm able to reproduce this behavior on Airflow 1.10.3 (and the latest 1.10.4RC) using latest versions of both Redis and RabbitMQ and Celery 4.3.0.

I've padded out suggested configuration options including:

  • scheduler__scheduler_heartbeat_sec (currently 180 seconds)
  • scheduler__job_heartbeat_sec (currently default 5 seconds)
  • scheduler__max_threads (currently just 1 thread)
  • celery_broker_transport_options__visibility_timeout (currently 21600 seconds)

Below is a DAG run that runs 5 SQL queries that set permissions across schemas.

  • Running these queries manually takes seconds
  • LocalExecutor in a non-dockerized environment will run the DAG in ~30 seconds.
  • CeleryExecutor in this new docker environment is still trying to run the first try for each task ~300 seconds into the run.

Scheduler:


[2019-07-29 01:20:23,407] {{jobs.py:1106}} INFO - 5 tasks up for execution:
        <TaskInstance: ldw_reset_permissions.service_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
        <TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
        <TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
        <TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
        <TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
[2019-07-29 01:20:23,414] {{jobs.py:1144}} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 5 task instances in queue
[2019-07-29 01:20:23,418] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 0/16 running and queued tasks
[2019-07-29 01:20:23,418] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 1/16 running and queued tasks
[2019-07-29 01:20:23,418] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 2/16 running and queued tasks
[2019-07-29 01:20:23,422] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 3/16 running and queued tasks
[2019-07-29 01:20:23,422] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 4/16 running and queued tasks
[2019-07-29 01:20:23,423] {{jobs.py:1223}} INFO - Setting the follow tasks to queued state:
        <TaskInstance: ldw_reset_permissions.service_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
        <TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
        <TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
        <TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
        <TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
[2019-07-29 01:20:23,440] {{jobs.py:1298}} INFO - Setting the following 5 tasks to queued state:
        <TaskInstance: ldw_reset_permissions.service_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
        <TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
        <TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
        <TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
        <TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
[2019-07-29 01:20:23,440] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'service_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,444] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'service_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,445] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'marketing_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,446] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'marketing_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,446] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'finance_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,446] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'finance_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,446] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'engineering_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,447] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'engineering_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,447] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'bi_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,447] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'bi_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:21:25,589] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.marketing_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1
[2019-07-29 01:21:25,599] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.engineering_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1
[2019-07-29 01:21:56,111] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.service_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1
[2019-07-29 01:22:28,133] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.bi_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1

Worker 1:

[2019-07-29 01:20:23,593: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[cb066498-e350-43c1-a23d-1bc33929717a]
[2019-07-29 01:20:23,605: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'service_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,627: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[d835c30a-e2bd-4f78-b291-d19b7bccad68]
[2019-07-29 01:20:23,637: INFO/ForkPoolWorker-1] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'finance_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:25,260] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=44
[2019-07-29 01:20:25,263] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=45
[2019-07-29 01:20:25,878] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:25,881] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:26,271] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:26,276] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:26,601] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host b4b0a799a7ca
[2019-07-29 01:20:26,604] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.service_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host b4b0a799a7ca
[2019-07-29 01:20:39,364: INFO/MainProcess] missed heartbeat from celery@0f9db941bdd7
[2019-07-29 01:21:46,121: WARNING/MainProcess] Substantial drift from celery@0f9db941bdd7 may mean clocks are out of sync.  Current drift is
70 seconds.  [orig: 2019-07-29 01:21:46.117058 recv: 2019-07-29 01:20:36.485961]
[2019-07-29 01:21:46,127: ERROR/MainProcess] Process 'ForkPoolWorker-15' pid:42 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:21:46,294: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:21:49,853: ERROR/MainProcess] Process 'ForkPoolWorker-17' pid:62 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:29,230: ERROR/MainProcess] Process 'ForkPoolWorker-18' pid:63 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:44,002: INFO/MainProcess] missed heartbeat from celery@0f9db941bdd7
[2019-07-29 01:22:52,073: ERROR/MainProcess] Process 'ForkPoolWorker-19' pid:64 exited with 'signal 9 (SIGKILL)'

Worker 2:

[2019-07-29 01:20:23,605: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[dbb9b813-255e-4284-b067-22b990d8b9a2]
[2019-07-29 01:20:23,609: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'marketing_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,616: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[42ee3e3a-620e-47da-add2-e5678973d87e]
[2019-07-29 01:20:23,622: INFO/ForkPoolWorker-1] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'engineering_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,632: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[be609901-60bc-4dcc-9374-7c802171f2db]
[2019-07-29 01:20:23,638: INFO/ForkPoolWorker-3] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'bi_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:26,124] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=45
[2019-07-29 01:20:26,127] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=46
[2019-07-29 01:20:26,135] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=44
[2019-07-29 01:20:27,025] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:27,033] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:27,047] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:27,798] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:27,801] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:27,806] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:28,426] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host 0f9db941bdd7
[2019-07-29 01:20:28,426] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host 0f9db941bdd7
[2019-07-29 01:20:28,437] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host 0f9db941bdd7
[2019-07-29 01:20:56,752: INFO/MainProcess] missed heartbeat from celery@b4b0a799a7ca
[2019-07-29 01:20:56,764: ERROR/MainProcess] Process 'ForkPoolWorker-15' pid:42 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:20:56,903: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:20:57,623: WARNING/MainProcess] Substantial drift from celery@b4b0a799a7ca may mean clocks are out of sync.  Current drift is
25 seconds.  [orig: 2019-07-29 01:20:57.622959 recv: 2019-07-29 01:20:32.629294]
[2019-07-29 01:20:57,631: ERROR/MainProcess] Process 'ForkPoolWorker-1' pid:24 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:20:57,837: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:20:58,513: ERROR/MainProcess] Process 'ForkPoolWorker-17' pid:65 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,076: INFO/MainProcess] missed heartbeat from celery@b4b0a799a7ca
[2019-07-29 01:22:23,089: ERROR/MainProcess] Process 'ForkPoolWorker-19' pid:67 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,105: ERROR/MainProcess] Process 'ForkPoolWorker-18' pid:66 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,116: ERROR/MainProcess] Process 'ForkPoolWorker-3' pid:26 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,191: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:22:26,758: ERROR/MainProcess] Process 'ForkPoolWorker-22' pid:70 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:26,770: ERROR/MainProcess] Process 'ForkPoolWorker-21' pid:69 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:26,781: ERROR/MainProcess] Process 'ForkPoolWorker-20' pid:68 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:29,988: WARNING/MainProcess] process with pid=65 already exited
[2019-07-29 01:22:29,991: ERROR/MainProcess] Process 'ForkPoolWorker-24' pid:75 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:30,002: ERROR/MainProcess] Process 'ForkPoolWorker-23' pid:71 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:30,017: ERROR/MainProcess] Process 'ForkPoolWorker-16' pid:43 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,202: INFO/MainProcess] missed heartbeat from celery@b4b0a799a7ca
[2019-07-29 01:23:14,206: ERROR/MainProcess] Process 'ForkPoolWorker-28' pid:79 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,221: ERROR/MainProcess] Process 'ForkPoolWorker-27' pid:78 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,231: ERROR/MainProcess] Process 'ForkPoolWorker-26' pid:77 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,242: ERROR/MainProcess] Process 'ForkPoolWorker-25' pid:76 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,252: ERROR/MainProcess] Process 'ForkPoolWorker-14' pid:41 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,503: ERROR/MainProcess] Process 'ForkPoolWorker-33' pid:87 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,572: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:86 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,622: ERROR/MainProcess] Process 'ForkPoolWorker-31' pid:85 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,646: ERROR/MainProcess] Process 'ForkPoolWorker-30' pid:84 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,828: ERROR/MainProcess] Process 'ForkPoolWorker-29' pid:83 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:43,361: WARNING/MainProcess] process with pid=84 already exited
[2019-07-29 01:23:43,723: ERROR/MainProcess] Process 'ForkPoolWorker-38' pid:92 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:44,119: ERROR/MainProcess] Process 'ForkPoolWorker-37' pid:91 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:44,536: ERROR/MainProcess] Process 'ForkPoolWorker-36' pid:90 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:45,203: ERROR/MainProcess] Process 'ForkPoolWorker-35' pid:89 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:45,510: ERROR/MainProcess] Process 'ForkPoolWorker-34' pid:88 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:10,947: WARNING/MainProcess] process with pid=68 already exited
[2019-07-29 01:24:11,579: ERROR/MainProcess] Process 'ForkPoolWorker-43' pid:97 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:12,288: ERROR/MainProcess] Process 'ForkPoolWorker-42' pid:96 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:13,880: ERROR/MainProcess] Process 'ForkPoolWorker-41' pid:95 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:14,775: ERROR/MainProcess] Process 'ForkPoolWorker-40' pid:94 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:15,762: ERROR/MainProcess] Process 'ForkPoolWorker-39' pid:93 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:25:05,623: WARNING/MainProcess] process with pid=75 already exited

Suggestions on what's going on and how to remedy this?

1
Not sure whether this can help. Try setting web_server_worker_timeout in config to a higher value. Maybe 300.nightgaunt
Thanks for the suggestion @nightgaunt, I went ahead and implemented it, though I believe I've resolved the issue through other means. When running the containers/tasks in the ECS environment, I was running under the assumption I had set CPU units such that they were being auto-allocated. If they were, they were done so poorly. After setting CPU units such that each worker was guaranteed its own node, everything has been running fine.Ryan H

1 Answers

1
votes

Turns out this was an issue related to AWS ECS, and not with Airflow configuration itself.

Through more testing and monitoring via htop, I noticed that regardless of worker node count, one worker node would always spike CPU continuously until the system killed it as shown in the logs above.

The container/task definitions for the Airflow workers did not have CPU units explicitly set, under the assumption they would be managed automatically since it wasn't a required field.

Specifying enough CPU units such that each worker container/task was distributed to its own EC2 instance cleared things right up.