Code:
Python version 2.7.x and airflow version 1.5.1
my dag script is this
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
From that you could see that I am creating a DAG with 6 tasks the first task(Start1) starts first after which all the other five tasks starts
Currently I have given 5 minutes time delay between DAG's starting
It has ran perfectly for all the six tasks the first type but after five minutes the DAG is not re-initiated
It has been more then 1 hour still the DAG is not re-initiated I really don't know were I am wrong .
It would be really nice if some one could point me out what is wrong .I tried clearing using airflow testing clear
then to the same thing happen.It ran first instance then just stood there.
The only thing the command line shows is Getting all instance for DAG testing
When I change the position of the schedule_interval it just runs with out any schedule interval parallel.That is with in 5 minutes 300 or more task instance has been completed. There is no 5 minute schedule interval
Code 2:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)