I have a dag which will be scheduled every hour. Let's say 01:00am, 02:00am, 03:00am. Let's say 02:00am is picked, but if 01:00am dag run is stil in progress, need to cancel 02:00am instance.
Am trying this code.
local_tz = pendulum.timezone("America/Chicago")
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2020, 11, 15, tzinfo=local_tz),
'run_as_user': user_id
}
dag = DAG(os.path.basename(__file__).replace(".pyc", "").replace(".py", ""),
catchup=False,
max_active_runs=1
schedule_interval='0 * * * *', #schedule_interval='@hourly'
default_args=default_args
)
def check_prev_dag_run_status(**kwargs):
curr_dag_id = kwargs['dag'].dag_id
curr_task_id = kwargs['task'].task_id
newdate = kwargs['execution_date']
ti = TaskInstance(curr_dag_id, curr_task_id, newdate)
state = ti.current_state()
if state=="running":
raise ValueError("Not all previous tasks successfully completed")
check_success_task = PythonOperator(
task_id='check_status',
python_callable= check_prev_dag_run_status,
provide_context=True,
dag=dag
)
run_this_0 = BashOperator(
task_id='run_shell',
bash_command="ksh runshellscript.ksh",
execution_timeout=None,
dag=dag
)
I have been getting error message that
[2020-11-17 12:30:07,337] {taskinstance.py:1150} ERROR - 'str' object has no attribute 'dag_id'
Traceback (most recent call last): File "/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task result = task_copy.execute(context=context)
File "/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute return_value = self.execute_callable()
File "/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_ca
*Please suggest me
- what am missing while passing arguments to airflow.models.taskinstance.TaskInstance
- Does execution_date give the immediate previous instance of the dag run? or how to get the immediate previous instance dag run state*