7
votes

I have a SubDAG in airflow with a long-running step (typically about 2 hours, though it varies based on which unit is being run). Under 1.7.1.3, this step would consistently cause AIRFLOW-736 and the SubDAG would stall in the 'running' state when all steps within were successful. We could work around this as we didn't have steps after the SubDAG by manually marking the SubDagOperator as successful (rather than running) in the database.

We're testing Airflow 1.8.1 now, upgrading by doing the following:

  1. Shuting down our scheduler and workers
  2. Via pip, uninstalling airflow and installing apache-airflow (version 1.8.1)
  3. Runing airflow upgradedb
  4. Running the airflow scheduler and workers

With the system otherwise untouched, the same DAG is now failing 100% of the time roughly after the long-running task hits the 1 hour mark (though oddly, not exactly 3600 seconds later - it can be anywhere from 30 to 90 seconds after the hour ticks) with the message "Executor reports task instance finished (failed) although the task says its running. Was the task killed externally?". However, the task itself continues running on the worker unabated. Somehow, there's disagreement between the scheduler mistaken in thinking the task failed (see this line of jobs.py) based on the database, despite the actual task running fine.

I've confirmed that, somehow, the state is 'failed' in the task_instance table of the airflow database. Thus, I'd like to know what could be setting the task state to failed when the task itself is still running.

Here's a sample dag which triggers the issue:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}

def define_sub(dag, step_name, sleeptime):
    op = BashOperator(
        task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
    )
    return dag

def gen_sub_dag(parent_name, step_name, sleeptime):
    sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
    define_sub(sub, step_name, sleeptime)
    return sub

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)

long_sub_dag = SubDagOperator(
    subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)
1
Today, I ran into the same issue, a Subdag with one long running task, after a bit more than an hour, I got said error message. Interestingly, the scheduler tried to restart the task, that failed due to a blocked resource outrside of airflow. The original task continued to run, and ended correctly, airflow marked the subdag as failed, before the task ended.Alexander Köb
What executor are you using. Is it Celery + Redis?Ash Berlin-Taylor

1 Answers

1
votes

If you are indeed running with Celery and Redis have a look at the visibility timeout setting for Celery and increase it beyond the expected end time of your task.

Although we configure Celery to tasks-ack-late, it still has issues with tasks disappearing. We consider this a bug in Celery.