0
votes

We have recently tried to adopt Airflow as our "data workflow" engine, and while I have figure most of the things out, I am still in the grey area about how the scheduler calculates when to trigger DAGs.

Take a look at this simple dag:

from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

dag_options = {                
            'owner':                'Airflow',  
            'depends_on_past':      False,      
            'start_date':           datetime.now()
}

with DAG('test_dag1', schedule_interval="5 * * * *", default_args=dag_options) as dag:
                task1 = BashOperator(      
                task_id='task1', 
                bash_command='date',                
                dag=dag)      

The schedule will pick this up, but will not execute it. Now if I change the "start_date" to:

datetime(year=xxxx,month=yyyy=day=zzzz) 

where xxxx,yyyy,zzzz are today's date, it will start executing. Is the cause of this that the scheduler keeps re-reading this dags from the source dag folder, executing datetime.now() each time, noticing the start date is different from currently queued, re-adding this dag and therefore re-scheduling/pushing the execution date forward (my dag_dir_list_interval is 300)?

Also, in airflow, as I understand it, when a dag is un-paused (or added with dags_are_paused_at_creation = False), the scheduler will schedule the execution as follows:

  • 1st dag execution: instant after (start_date + interval)
  • 2nd dag execution: instant after (start_date + (interval * 2))
  • 3rd dag execution: instant after (start_date + (interval * 3))

Is this correct assumption?

UPDATE (7/30/2017)

Based on the assumption above, I created this dag today (07/30/2017):

from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

dag_options = {                
            'owner':             'Airflow',  
            'depends_on_past':   False,      
            'start_date':   
datetime(year=2017,month=7,day=30,hour=20,minute=10)
}

with DAG('test_dag_100', schedule_interval="*/10 * * * *", 
default_args=dag_options) as dag:
                task1 = BashOperator(      
                task_id='task_100', 
                bash_command='date',                
                dag=dag)      

which should start on (UTC):

  • 7/30/2017 20:20:00
  • 7/30/2017 20:30:00
  • 7/30/2017 20:40:00

Unfortunately this is not happening. Here are some screen shots of my dashboard:

Can someone explain why on 20:21:00 the dag did not execute? after 20:31:00 it still did not execute... What am i missing here?

By the way, I also noticed that, for some reason, that every time I go and kick off a dag manually through the dashboard, it just sits in the "running" stage. Why is this? Does kicking it off manually have anything to do with any of the start timing options (start_date/interval/etc) ??

Thank you for any clarifications you can provide

1
schedule interval is crontab, you can try crontab.guru to test what interval means to you. If you are using 1.8 + , datetime.now() is not considered best practice, you can find more detail here github.com/apache/incubator-airflow/blob/master/… - Chengzhi

1 Answers

2
votes

Your assumptions are correct. Airflow will schedule the first DAG run after the specified schedule interval has elapsed from the start date. Using datetime.now() as the start date will results in Airflow rarely, if ever, triggering a DAG. It's mentioned in the scheduling docs.

If you were to specify a specific start date, such as datetime(2017,7,27,1,0) with a schedule interval of "5 * * * *", then at 1:05am on 7/27 the DAG will be triggered to run the first time. It'll continue to run every five minutes after that.