0
votes

I have a dag which will be scheduled every hour. Let's say 01:00am, 02:00am, 03:00am. Let's say 02:00am is picked, but if 01:00am dag run is stil in progress, need to cancel 02:00am instance.

Am trying this code.

local_tz = pendulum.timezone("America/Chicago")

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime(2020, 11, 15, tzinfo=local_tz),
    'run_as_user': user_id
}

dag = DAG(os.path.basename(__file__).replace(".pyc", "").replace(".py", ""),
          catchup=False,
          max_active_runs=1
          schedule_interval='0 * * * *',  #schedule_interval='@hourly'
          default_args=default_args
)

def check_prev_dag_run_status(**kwargs):
    curr_dag_id = kwargs['dag'].dag_id
    curr_task_id = kwargs['task'].task_id
    newdate = kwargs['execution_date']
    ti = TaskInstance(curr_dag_id, curr_task_id, newdate)
    state = ti.current_state()
    if state=="running":
        raise ValueError("Not all previous tasks successfully completed")
        
check_success_task = PythonOperator(
    task_id='check_status',
    python_callable= check_prev_dag_run_status,
    provide_context=True,
    dag=dag
)

run_this_0 = BashOperator(
    task_id='run_shell',
        bash_command="ksh runshellscript.ksh",
        execution_timeout=None,
        dag=dag 
  )

I have been getting error message that

[2020-11-17 12:30:07,337] {taskinstance.py:1150} ERROR - 'str' object has no attribute 'dag_id'

Traceback (most recent call last): File "/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task result = task_copy.execute(context=context)

File "/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute return_value = self.execute_callable()

File "/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_ca

*Please suggest me

  1. what am missing while passing arguments to airflow.models.taskinstance.TaskInstance
  2. Does execution_date give the immediate previous instance of the dag run? or how to get the immediate previous instance dag run state*
2
execution_date gives you the execution date of the current DAG run, not the previous run, see also Airflow SchedulerPhilipp Johannis
Am actually looking for what am missing while passing arguments to airflow.models.taskinstance.TaskInstanceSandeep Kumar

2 Answers

0
votes

airflow.models.taskinstance.TaskInstance just takes two arguments, task and execution_date, not 3 as in your code. Additionally task is not the task_id but rather the defined task, in your example it's run_this_0 I guess. You need to pass the execution_date of the last task run, not the current one. Also the status can be different from running and still unsuccessful, so I would also change this.

Putting all this together, the following code should be fine to check if run_this_0 of the previous DAG Run succeeded:

def check_prev_dag_run_status(**kwargs):
    newdate = kwargs['prev_execution_date']
    ti = TaskInstance(run_this_0, newdate)
    state = ti.current_state()
    if state!="success":
        raise ValueError("Not all previous tasks successfully completed")
0
votes

Set task concurreny to 1 for this task. Newer task will not run unless previous run has not started.

Set depend on past true. This has drawback that if one batch fails, next batches will not run.

Use external task sensor with mode reschedule to wait for earlier batch to complete.