I want to set the execution_date in a trigger DAG. I´m using the operator TriggerDagRunOperator, this operator have the parameter execution_date, I want to set the current execution_date.
def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
pp = pprint.PrettyPrinter(indent=4)
c_p = Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1"
print("Controller DAG : conditionally_trigger = {}".format(c_p))
if Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1":
pp.pprint(dag_run_obj.payload)
return dag_run_obj
default_args = {
'owner': 'pepito',
'depends_on_past': False,
'retries': 2,
'start_date': datetime(2018, 12, 1, 0, 0),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'DAG_1',
default_args=default_args,
schedule_interval="0 12 * * 1",
dagrun_timeout=timedelta(hours=22),
max_active_runs=1,
catchup=False
)
trigger_dag_2 = TriggerDagRunOperator(
task_id='trigger_dag_2',
trigger_dag_id="DAG_2",
python_callable=conditionally_trigger,
execution_date={{ execution_date }},
dag=dag,
pool='a_roz'
)
But I obtain the next error
name 'execution_date' is not defined
If I set
execution_date={{ 'execution_date' }},
or
execution_date='{{ execution_date }}',
I obtain
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", line 78, in execute
replace_microseconds=False)
File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 98, in trigger_dag
replace_microseconds=replace_microseconds,
File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 45, in _trigger_dag
assert timezone.is_localized(execution_date)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/timezone.py", line 38, in is_localized
return value.utcoffset() is not None
AttributeError: 'str' object has no attribute 'utcoffset'
Does anyone know how I can set the execution date for DAG_2 if I want to be equal to DAG_1?
This question is diferent to airflow TriggerDagRunOperator how to change the execution date because In this post didn't explain how to send the execution_date through the operator TriggerDagRunOperator, in it is only said that the possibility exists. https://stackoverflow.com/a/49442868/10269204