1
votes

I have a project that started in 2018 but for which I add some new DAGs regularly. Hence, all my DAGs have a start_date in 2018, a schedule_interval daily, but a catch_up set to False because when I add a new DAG now, I do not want it to run for every day since 2018 (for now, maybe I will have to run it for all these days).

However, most of the time, I want it to run for some weeks before the date I added it. I expected that the dag runs between the start_date and the added_date (the date where I added the dag) appear in the DAG Tree View UI as white circles and thus, that I can trigger it manually for the last two weeks. But nothing appears in this view...

So, I run a backfill manually (from command line... a backfill interface in UI would be nice), but none of the runs executed by backfill appears in the UI. Hence, if one run of the backfill failed, I still cannot re-run it from the UI.

Is "not showing possible dag-runs between start_date and added_date." the intended behavior of Airflow ? Is there any way to overcome this ? Or is there a better way to handle this use case: "Add a DAG, and manually run it for some dates in the past."


[Edit] Programmatic dagrun failing

As proposed by Philipp, a solution can be to turn catchup on, and mark all runs between start_date and add_date as success (or failure, whatever).

I ended up with something like this:

import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State


start_date = datetime.datetime(2020, 10, 1, tzinfo=datetime.timezone.utc)
add_date = datetime.datetime.now(datetime.timezone.utc)

# Define a DAG with just one task
with DAG("test_catchup", schedule_interval="@daily", catchup=True, start_date=start_date) as dag:
    BashOperator(
            task_id="print",
            bash_command="echo {{ ds }}"
            )

    # For all dates between start_date and add_date, create a dagrun and mark it as failed.
    for d in [start_date + datetime.timedelta(n) for n in range(int((add_date - start_date).days))]:
        print("Create dagrun for ", d.isoformat())
        try:
            dag.create_dagrun(run_id="programmatic_fill_{}".format(d.isoformat()), state=State.FAILED, execution_date=d, start_date=add_date, external_trigger=False)
        except Exception as e:
            print("Error:", e)
            pass

As you can see, first, you have to nest the dagrun creation in a try-except block as each time Airflow will read this file, it will try to add the same entries in the dagrun database and fail with some primary key conflicts.

This roughly works. All my dagruns appear:

All dagruns appearing in UI

However, I cannot (re-)run any of them. While clearing one, I get the following error:

No task instances to clear

I managed to mark one as success (which turn the circle and the square to green color), then clear it which turn the DAG (circle) to running state, but turn the task to the None state and it never executes...


[Edit] Latest Only Operator

From another great idea of Philipp, I gave a try to the LatestOnlyOperator.

import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State
from airflow.operators.latest_only_operator import LatestOnlyOperator


start_date = datetime.datetime(2020, 10, 1)
with DAG("test_latest_only", schedule_interval="@daily", catchup=True, start_date=start_date) as dag:
    LatestOnlyOperator(
            task_id="latest_filter"
    ) >> BashOperator(
            task_id="print",
            bash_command="echo {{ ds }}"
    )

And the result (I already manually re-run the first dagrun):

Latest and arbitrary past dag runs successful

Pros:

  • It achieves what I try to do

Cons:

  • It requires one more operator
  • It is slow to bootstrap. It took about 5 minutes to run my 12 dags, while just stopping at the first task (sholud I use it to "backfill" 2 years of daily job ?)
  • You must not clear the DAG but just your first task below the LatestOnlyOperator, or it will continue to prevent the execution of the downstream tasks.

Finally, this operator seems to be an old trick before the availability of the catchup=False option, and I am not sure of its sustainability as it was already discussed to deprecate it.

1
Bad and crud way you can update schedule from back end ( update entry in db)sandeep rawat
@sandeeprawat How do you achieve this ? From CLI ? With which command ?AlexisBRENON
Why don't you set catchup to True. But then using airflow backfill with -m flag to mark all runs as successful without executing them. Documentation of Airflow backfill can be found here.Philipp Johannis
@PhilippJohannis Good idea. Is there a way to automatically run backfill command, this will avoid me to connect to Airflow host and run backfill manually (that if I forget would spend many time and resources catching up some useless DAG). Something at the end of the DAG declaration to interact with Airflow ?AlexisBRENON
Hmm, you may can use it combination with LatestOnlyOperator to achieve this. See this question.Philipp Johannis

1 Answers

0
votes

I finally manadge to find a suitable solution, inspired from the "Programmatic DAG run" idea.

The error message stated that there is No task instances to clear. So the solution is to create the task instances as you create the DAG run.

Here is my working solution:

import datetime

from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State
from airflow.utils.timezone import parse

start_date = parse(datetime.datetime(2020, 11, 1).isoformat())
add_date = datetime.datetime(2020, 11, 8, tzinfo=datetime.timezone.utc)

# Create your dag
with DAG("test_task_instance", schedule_interval="@daily", catchup=False, start_date=start_date) as dag:
    # And its tasks
    BashOperator(
            task_id="print",
            bash_command="echo '{{ ti }}' > /tmp/{{ ds }}"
            )

    # For each expected execution date
    for d in dag.date_range(dag.start_date, end_date=add_date)[:-1]:
        # If no DAG run already exists
        if dag.get_dagrun(d) is None:
            # Create a new one (marking the state as SKIPPED)
            dagrun = dag.create_dagrun(run_id="programmatic_fill_{}".format(d.isoformat()),
                                       state=State.SKIPPED,
                                       execution_date=d,
                                       start_date=add_date,
                                       external_trigger=False)
            # And create task instance for each task for this DAG run
            for t in dag.tasks:
                TaskInstance(task=t, execution_date=dagrun.execution_date)

As you can see, no more try...except block required as we ask before creating new one. And a SKIPPED state seems more appropriate for these DAG runs as nothing were actually runned.

Then, when looking at your DAG in the airflow UI.

All dags are marked as failed, tasks have no state

And finally you can cherry-pick which dag to run by clearing it and letting the scheduler do its job.

Scheduler queued tasks

Selected DAGs were successfully runned


Edit:

I edited the code below which supersede the scheduler, creating "catch up" dags before the scheduler make them.

The current solution, which is not the best in my opinion is to drop the last expected execution date.

- for d in dag.date_range(dag.start_date, end_date=add_date):
+ for d in dag.date_range(dag.start_date, end_date=add_date)[:-1]: