1
votes

I am using Airflow and Cloud Composer and as I have some issues with Airflow Scheduler (it is slow or stops) Version: composer-1.10.4-airflow-1.10.6

I launched a "huge" collect (because I will sometimes need it) with airflow to test the scalability of my pipelines. The result is that my scheduler apparently only schedule the DAGs with few tasks, and the tasks of the big DAGs are not scheduled. Do you have insights or advices about that?

Here are information about my current configuration:

Cluster config:

  • 10 Cluster nodes, 20 vCPUs, 160Go Memory

airflow config:

core

  • store_serialized_dags: True
  • dag_concurrency: 160
  • store_dag_code: True
  • min_file_process_interval: 30
  • max_active_runs_per_dag: 1
  • dagbag_import_timeout: 900
  • min_serialized_dag_update_interval: 30
  • parallelism: 160

scheduler

  • processor_poll_interval: 1
  • max_threads: 8
  • dag_dir_list_interval: 30

celery

  • worker_concurrency: 16

webserver

  • default_dag_run_display_number: 5
  • workers: 2
  • worker_refresh_interval: 120

airflow scheduler DagBag parsing (airflow list_dags -r):

DagBag loading stats for /home/airflow/gcs/dags

Number of DAGs: 27 Total task number: 32229 DagBag parsing time: 22.468404

---------------+--------------------+---------+----------+-----------------------

file           | duration           | dag_num | task_num | dags        

---------------+--------------------+---------+----------+-----------------------

/folder__dags/dag1  | 1.83547                       |       1 |     1554 | dag1 

/folder__dags/dag2  | 1.717692                     |       1 |     3872 | dag2 

/folder__dags/dag3  | 1.53                             |       1 |     3872 | dag3 

/folder__dags/dag4  | 1.391314                     |       1 |      210 | dag4 

/folder__dags/dag5  | 1.267788                     |       1 |     3872 | dag5 

/folder__dags/dag6  | 1.250022                     |       1 |     1554 | dag6 

/folder__dags/dag7  | 1.0973419999999998           |       1 |     2904 | dag7 

/folder__dags/dag8  | 1.081566                     |       1 |     3146 | dag8 

/folder__dags/dag9  | 1.019032                     |       1 |     3872 | dag9 

/folder__dags/dag10 | 0.98541                      |       1 |     1554 | dag10

/folder__dags/dag11 | 0.959722                    |       1 |      160 | dag11

/folder__dags/dag12 | 0.868756                    |       1 |     2904 | dag12

/folder__dags/dag13 | 0.81513                      |       1 |      160 | dag13

/folder__dags/dag14 | 0.69578                      |       1 |       14 | dag14

/folder__dags/dag15 | 0.617646                    |       1 |      294 | dag15

/folder__dags/dag16 | 0.588876                    |       1 |      210 | dag16

/folder__dags/dag17 | 0.563712                    |       1 |      160 | dag17

/folder__dags/dag18 | 0.55615                      |       1 |      726 | dag18

/folder__dags/dag19 | 0.553248                    |       1 |      140 | dag19

/folder__dags/dag20 | 0.55149                      |       1 |      168 | dag20

/folder__dags/dag21 | 0.543682                    |       1 |      168 | dag21

/folder__dags/dag22 | 0.530684                    |       1 |      168 | dag22

/folder__dags/dag23 | 0.498442                    |       1 |      484 | dag23

/folder__dags/dag24 | 0.46574                      |       1 |       14 | dag24

/folder__dags/dag25 | 0.454656                    |       1 |       28 | dag25

/create_conf        | 0.022272                          |       1 |       20 | create_conf

/airflow_monitoring | 0.006782                       |       1 |        1 | airflow_monitoring

---------------+--------------------+---------+----------+------------------------

Thank you for your help

1

1 Answers

2
votes

Airflow scheduler processes files in the DAGs directory in round-robin scheduling algorithm and this can cause long delays between tasks because the scheduler will not be able to enqueue a task whose dependencies recently completed until its round robin returns to the enclosing DAG's module. Multiple DAG objects can be defined in the same Python module, but this is generally discouraged from a fault isolation perspective. It may be necessary to define multiple DAGs per module.

  • Sometimes the best approach is to restart the scheduler:

    1. Get cluster credentials as described in official documentation
    2. Run the following command to restart the scheduler:

kubectl get deployment airflow-scheduler -o yaml | kubectl replace --force -f -

Additionally, please restart the Airflow web server. Sometimes broken, invalid or resource intensive DAGs can cause webserver crashes, restarts or complete downtime. Once way to do so is remove or upgrade one of the PyPI packages from your environment.

  • Exceeding API usage limits/quotas

To avoid exceeding API usage limits/quotas or avoid running too many simultaneous processes, you can define Airflow pools in the Airflow web UI and associate tasks with existing pools in your DAGs. Refer to the Airflow documentation.

  • Check the logs in Logging section -> Cloud Composer Environment and look for any errors or warnings like: cannot import module, DagNotFound in DagModel.

Please, have a look to my earlier answer regarding memory. Referring to the official documentation:

DAG execution is RAM limited. Each task execution starts with two Airflow processes: task execution and monitoring. Currently, each node can take up to 6 concurrent tasks. More memory can be consumed, depending on the size of the DAG.

Moreover, I would like to share with you an interesting article on Medium, regarding calculations for resource requests.

I hope you find the above pieces of information useful.