0
votes

Is there a way to stop an airflow dag if enough of certain tasks fail? Eg. have collection of tasks that all do same thing for different values

for dataset in list_of_datasets:
    task_1 = BashOperator(task_id="task_1_%s" % dataset["id"], ...)
    task_2 = BashOperator(task_id="task_2_%s" % dataset["id"], ...)
    task_3 = BashOperator(task_id="task_3_%s" % dataset["id"], ...)
    task_1 >> task_2 >> task_3

and if, say any 5 instances of task_2 fail, then it means something bigger is wrong with the underlying process used for task_2 (as opposed to the individual dataset being processed in the particular task instance) and that that tasks is likely not going to succeed for any other instance of that task, so the whole dag should stop or skip to a later / alternative-branching task.

Is there a way to enforce this by setting something in the task declarations? Any other common workarounds for this kind of situation?

1
You can use trigger rules to set the dependencies you require. Generally though, if your task fails n times (where n is the amount of retries you specified) then the DAG run fails (and all downstream tasks, be default). - manesioz
@manesioz My problem with the trigger rules is that there is no some_failed kind of trigger and the tasks in question do not retry (in my case, if they fail on an individual level there is usually some uncaught data problem that I want to investigate, not something that would benefit from trying again (updated question to hopefully clear up this confusion)). - lampShadesDrifter

1 Answers

0
votes

One approach may be to slip in a BranchPythonOperator or ShortCircuitOerator and devise the trigger logic into the python_callable argument of the operator. You can always access the airflow.models.TaskInstance SQLAlchemy (sub-)class models storing all the attributes of task instances for all the dag runs. You can access the task instances using the get_task_instances() method on the dag object. (As aside, dag object can be accessed via the 'context'. which is an argument to the python_callable.)