I have a project that started in 2018 but for which I add some new DAGs regularly. Hence, all my DAGs have a start_date
in 2018, a schedule_interval
daily, but a catch_up
set to False because when I add a new DAG now, I do not want it to run for every day since 2018 (for now, maybe I will have to run it for all these days).
However, most of the time, I want it to run for some weeks before the date I added it.
I expected that the dag runs between the start_date
and the added_date
(the date where I added the dag) appear in the DAG Tree View UI as white circles and thus, that I can trigger it manually for the last two weeks.
But nothing appears in this view...
So, I run a backfill manually (from command line... a backfill interface in UI would be nice), but none of the runs executed by backfill appears in the UI. Hence, if one run of the backfill failed, I still cannot re-run it from the UI.
Is "not showing possible dag-runs between start_date
and added_date
." the intended behavior of Airflow ? Is there any way to overcome this ? Or is there a better way to handle this use case: "Add a DAG, and manually run it for some dates in the past."
[Edit] Programmatic dagrun failing
As proposed by Philipp, a solution can be to turn catchup on, and mark all runs between start_date
and add_date
as success (or failure, whatever).
I ended up with something like this:
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State
start_date = datetime.datetime(2020, 10, 1, tzinfo=datetime.timezone.utc)
add_date = datetime.datetime.now(datetime.timezone.utc)
# Define a DAG with just one task
with DAG("test_catchup", schedule_interval="@daily", catchup=True, start_date=start_date) as dag:
BashOperator(
task_id="print",
bash_command="echo {{ ds }}"
)
# For all dates between start_date and add_date, create a dagrun and mark it as failed.
for d in [start_date + datetime.timedelta(n) for n in range(int((add_date - start_date).days))]:
print("Create dagrun for ", d.isoformat())
try:
dag.create_dagrun(run_id="programmatic_fill_{}".format(d.isoformat()), state=State.FAILED, execution_date=d, start_date=add_date, external_trigger=False)
except Exception as e:
print("Error:", e)
pass
As you can see, first, you have to nest the dagrun creation in a try-except block as each time Airflow will read this file, it will try to add the same entries in the dagrun database and fail with some primary key conflicts.
This roughly works. All my dagruns appear:
However, I cannot (re-)run any of them. While clearing one, I get the following error:
No task instances to clear
I managed to mark one as success (which turn the circle and the square to green color), then clear it which turn the DAG (circle) to running state, but turn the task to the None
state and it never executes...
[Edit] Latest Only Operator
From another great idea of Philipp, I gave a try to the LatestOnlyOperator
.
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State
from airflow.operators.latest_only_operator import LatestOnlyOperator
start_date = datetime.datetime(2020, 10, 1)
with DAG("test_latest_only", schedule_interval="@daily", catchup=True, start_date=start_date) as dag:
LatestOnlyOperator(
task_id="latest_filter"
) >> BashOperator(
task_id="print",
bash_command="echo {{ ds }}"
)
And the result (I already manually re-run the first dagrun):
Pros:
- It achieves what I try to do
Cons:
- It requires one more operator
- It is slow to bootstrap. It took about 5 minutes to run my 12 dags, while just stopping at the first task (sholud I use it to "backfill" 2 years of daily job ?)
- You must not clear the DAG but just your first task below the LatestOnlyOperator, or it will continue to prevent the execution of the downstream tasks.
Finally, this operator seems to be an old trick before the availability of the catchup=False
option, and I am not sure of its sustainability as it was already discussed to deprecate it.
catchup
to True. But then usingairflow backfill
with-m
flag to mark all runs as successful without executing them. Documentation of Airflow backfill can be found here. – Philipp JohannisLatestOnlyOperator
to achieve this. See this question. – Philipp Johannis