0
votes

I have an Airflow cluster made of 3 worker nodes with CeleryExecutor and RabbitMQ for communication. My DAGs usually consist of tasks that download files, unzip them, upload them to hadoop, etc. So they depend on each other and must run on single machine/node.

When airflow schedules these tasks of a single DAG, on different nodes, I end up with errors because these tasks are scheduled on different machines but I need all tasks within a DAG to be scheduled on a single machine.

I tried setting dag_concurrency = 1 and max_active_runs_per_dag = 1 both on airflow.cfg and when initializing the dag, DAG(concurrency = 1, max_active_runs = 1) with no success.

Rest of my airflow.cfg:

parallelism = 32
dag_concurrency = 1
worker_concurrency = 16
max_active_runs_per_dag = 16

As far as I understand, setting dag_concurrency to 1 should do the trick but what am I missing here?

1

1 Answers

1
votes

CeleryExecutor supports multiple queues, you could define a specific queue for each operator (is an attribute of BaseOperator) and then subscribe each worker to that specific queue. Note that workers can listen to one or multiple queues.

From the docs:

Workers can listen to one or multiple queues of tasks. When a worker is started (using the command airflow celery worker), a set of comma-delimited queue names can be specified (e.g. airflow celery worker -q spark). This worker will then only pick up tasks wired to the specified queue(s)

This is an example DAG:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
}
dag = DAG('dist_example',
          schedule_interval='@once',
          catchup=False,
          default_args=default_args
          )
get_hostname = 'echo $(hostname)'

t1 = BashOperator(
    task_id='task_for_q1',
    bash_command=get_hostname,
    queue='queue_1',
    dag=dag
)
t2 = BashOperator(
    task_id='task_for_q2',
    bash_command=get_hostname,
    queue='queue_2',
    dag=dag
)
t1 >> t2

worker_1: airflow celery worker -q default,queue_1

worker_2: airflow celery worker -q default,queue_2

By listening to both your specific queue and default (defined by default_queue config key) you won't affect the standard multi-workers behavior for any other tasks.

I hope that works for you!