13
votes

Code:

Python version 2.7.x and airflow version 1.5.1

my dag script is this

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
    t.set_upstream(run_this_first)

From that you could see that I am creating a DAG with 6 tasks the first task(Start1) starts first after which all the other five tasks starts

Currently I have given 5 minutes time delay between DAG's starting

It has ran perfectly for all the six tasks the first type but after five minutes the DAG is not re-initiated

It has been more then 1 hour still the DAG is not re-initiated I really don't know were I am wrong .

It would be really nice if some one could point me out what is wrong .I tried clearing using airflow testing clear then to the same thing happen.It ran first instance then just stood there.

The only thing the command line shows is Getting all instance for DAG testing

When I change the position of the schedule_interval it just runs with out any schedule interval parallel.That is with in 5 minutes 300 or more task instance has been completed. There is no 5 minute schedule interval

Code 2:

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
    t.set_upstream(run_this_first)
2

2 Answers

10
votes

For Code 2, I guess the reason why it runs every minute is:

  1. The start time is 2015-10-13 00:00

  2. The schedule interval is 5 minutes

  3. Every heartbeat of scheduler(5 seconds by default), your DAG will be checked

    • First check: start date(no last execution date found) + scheduler interval < current time? If yes the DAG will be executed and last execution time will be recorded. (eg. 2015-10-13 00:00 + 5min < current?)
    • Second check on next heartbeat: last execution time + scheduler interval < current time? If so the DAG will be executed again.
    • ....

The solution is set the DAG start_date as datetime.now() - schedule_interval.

And also if you want to debug:

  1. Setting the LOGGINGLEVEL to debug in settings.py

  2. Modify class method is_queueable() of airflow.models.TaskInstance to

:

def is_queueable(self, flag_upstream_failed=False):
    logging.debug('Checking whether task instance is queueable or not!')
    if self.execution_date > datetime.now() - self.task.schedule_interval:
        logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now()))
        return False
        ...
3
votes

Because the start time(2015-10-13 00:00) less than now time, it triggers the airflow backfill. It will run from 2015-10-13 00:00 when every seconds the airflow scheduler detected(its the Start Date), but Execution Date is between 5 min(task interval time).

See the log name:

$tree airflow/logs/testing/
testing/
|-- Orders10
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders11
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders12
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders13
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders14
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
-- Start1
    |-- 2015-10-13T00:00:00
    |-- 2015-10-13T00:05:00
    |-- 2015-10-13T00:10:00
    -- 2015-10-13T00:15:00

See the create time of logs:

$ll airflow/logs/testing/Start1
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:50 2015-10-13T00:00:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:50 2015-10-13T00:05:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:51 2015-10-13T00:10:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:52 2015-10-13T00:15:00

Also, you can see the Task Instances on web UI:

air flow Task Instances