4
votes

I want to set the execution_date in a trigger DAG. I´m using the operator TriggerDagRunOperator, this operator have the parameter execution_date, I want to set the current execution_date.

def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    pp = pprint.PrettyPrinter(indent=4)
    c_p = Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1"
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    if Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1":
        pp.pprint(dag_run_obj.payload)
        return dag_run_obj

default_args = {
    'owner': 'pepito',
    'depends_on_past': False,
    'retries': 2,
    'start_date': datetime(2018, 12, 1, 0, 0),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    'DAG_1',
    default_args=default_args,
    schedule_interval="0 12 * * 1",
    dagrun_timeout=timedelta(hours=22),
    max_active_runs=1,
    catchup=False
)

trigger_dag_2 = TriggerDagRunOperator(
    task_id='trigger_dag_2',
    trigger_dag_id="DAG_2",
   python_callable=conditionally_trigger,
    execution_date={{ execution_date }},
   dag=dag,
   pool='a_roz'
)

But I obtain the next error

name 'execution_date' is not defined

If I set

execution_date={{ 'execution_date' }},

or

execution_date='{{ execution_date }}',

I obtain

Traceback (most recent call last):

File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task

result = task_copy.execute(context=context)

File "/usr/local/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", line 78, in execute

replace_microseconds=False)

File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 98, in trigger_dag

replace_microseconds=replace_microseconds,

File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 45, in _trigger_dag

assert timezone.is_localized(execution_date)

File "/usr/local/lib/python3.6/site-packages/airflow/utils/timezone.py", line 38, in is_localized

return value.utcoffset() is not None

AttributeError: 'str' object has no attribute 'utcoffset'

Does anyone know how I can set the execution date for DAG_2 if I want to be equal to DAG_1?

This question is diferent to airflow TriggerDagRunOperator how to change the execution date because In this post didn't explain how to send the execution_date through the operator TriggerDagRunOperator, in it is only said that the possibility exists. https://stackoverflow.com/a/49442868/10269204

1
In this post didn't explain how to send the execution_date through the operator TriggerDagRunOperator, in it is only said that the possibility exists.cimujo

1 Answers

0
votes

it was not templated previously, but it is templated now with this commit

you can try your code with new version of airflow

additionally for hardcoded execution_date, you need to set tzinfo:

from datetime import datetime, timezone
execution_date=datetime(2019, 3, 27, tzinfo=timezone.utc)
# or:
execution_date=datetime.now().replace(tzinfo=timezone.utc)