1
votes

I am currently working with Airflow and Celery for processing files. A worker needs to download files, process them and re-upload them after. My DAGs are fine with only one worker. But when I add one things get complicated.

Workers takes tasks as they are available. Worker1 can take the task "processing downloaded files" but that was Worker2 that took the task "downloading files", so the task failed, because it can't process files that don't exist.

Is there a way to specify to the workers (or the scheduler) that a DAG must be run only on one worker? I know about queue. But I am already using them.

1
the easiest way to avert this problem would be be merge downloading and processing into a single task. BTW whats the problem with queues?y2k-shubham
Thank you for your answer, but there is actually more than 3 tasks, so merging might not be a good idea. The problem with queue is that i actually use them, and i want my airflow installation to be scalable, and not to change my DAGs files everytime I add or remove one worker...Citysto

1 Answers

0
votes

In this case, you can have an Airflow Variable to save all your worker nodes name. For ex.:

  • Variable: worker_list
  • Value: boxA, boxB, boxC

When run the Airflow worker, you can specify multiple job queues. For ex.: airflow worker job_queue1,job_queue2 For your case, I'll run airflow worker af_<hostname>

In your DAG code, just need to get that worker_list Airflow variable, select a box randomly, then queue all your jobs to af_<random_selected_box> queue