1
votes

I am trying to define dynamics dags in Airflow from a custom function.

If the function is defined in the dag file, then the dag is effectively created.

BUT, if I put this function in a custom library (installed in the airflow env with pip) and import it in the dag file, the dag is never created, though not throwing any errors.

My actual code roughly looks like :

  1. library.py

     def create_dag(dag_id, main, default_args, schedule_interval):
         dag = DAG(
             dag_id=dag_id,
             default_args=default_args,
             schedule_interval=None
         )
    
         with dag:
             t1 = PythonOperator(
                 task_id="main",
                 python_callable=main
             )
    
         return dag
    
  2. dag.py

     from library import create_dag
    
     dag_id = "test"
    
     def main():
         print("foo")
    
     globals()[dag_id] = create_dag(
         dag_id,
         main,
         default_args,
         None
     )
    

As explained, if I define this function directly in dag.py, everything works fine. Any thoughts would be welcome !

Thanks in advance

1

1 Answers

1
votes

Finally found the solution in the doc : "Does the file containing your DAG contain the string “airflow” and “DAG” somewhere in the contents? When searching the DAG directory, Airflow ignores files not containing “airflow” and “DAG” in order to prevent the DagBag parsing from importing all python files collocated with user’s DAGs." By moving the function into another folder, I was deleting the word "DAG" from the file.