0
votes

I am working with Airflow (on Cloud Composer) since one year now and I have difficulties to find out how (Celery) workers do know what actions to perform when they receive a task to execute.

From what I understand :

  • We put some DAGs in the /dags folder.
  • The scheduler, through a loop process, parses the DAGs and save the results in the metadata DB, it also determines if a task from a DAG has to run based on its dependencies.
  • If some tasks have to run, the Executor will sends the task to a queue which is listened by the Celery workers.
  • One of the Celery workers gets the task to execute and do the job.

But how the Celery worker knows what to execute? I can see that there is a log saying :

[2021-06-30 12:58:59,814] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'dag_to_exec', 'task_to_exec', '2021-06-30T12:57:09+00:00', '--job_id', '2822201', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/dag_to_exec.py', '--cfg_path', '/tmp/tmpank91zop']

Please correct me if I'm wrong but does the part '-sd', 'DAGS_FOLDER/dag_to_exec.py' is here to say to this Airflow worker "Execute this task from this dag which is saved there"? So the Airflow worker also needs to parse the DAG to understand it as well right? I'm saying "also" because the scheduler did parse it too earlier.

If you have links to share or part of the source code to look at to understand this, thanks in advance !

1
You can check airflow's github for their source code. You can start on airflow/airflow/task/task_runner if you are interested on airflow runners. This article could also help to understand in a high level on how airflow distributes jobs on celery workers.Ricco D

1 Answers

0
votes

Yes. You have correct understanding.

The DAGs are parsed by both - Workers and Scheduler. Scheduler will never execute the execute() methods of the BaseOperator defined objects. It will parse the DAG files, construct the DAGs and Operators as Python objects and build relationships between them to be able to know what should be scheduled.

This parse/create step is re-executed by each worker just before it executes the task to be able to build the "BaseOperator" derived objects (including dependencies but those are not important for workers), pick the right "task" (i.e. the BaseOperator-derived object identified by the task_id and run it's execute() method (there are a few nuances like pre-execute and post-execute methods as well being executed).

This is described in high level in couple of places easiest reachable from https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html#workloads

If you think such a description of "class/parsing" relation is not easy to find/understand, I heartily invite you to help the community and add such a description from a perspective of new person coming. This is always the best if people who try to understand it for the first time provide some more context for others who will come after them (as long-time Airflow committers we have a lot of assumptions in our heads).

It's actually super easy https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html#workloads - contains "Suggest a change" link at the bottom and you could use it to make a PR to the doc (make sure you fork airflow repo first).