2
votes

I have 2 files inside dags directory - dag_1.py and dag_2.py

dag_1.py creates a static DAG and dag_2.py creates dynamic DAGs based on external json files at some location.

The static DAG (created by dag_1.py) contains a task at a later stage which generates some of these input json files for dag_2.py and dynamic DAGs are created in this manner.

This used to work fine with Airflow 1.x versions where DAG Serialization was not used. But with Airflow 2.0 DAG Serialization has become mandatory. Sometimes, I get the following exception in the Scheduler when dynamic DAGs are spawned -

[2021-01-02 06:17:39,493] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
    self._run_scheduler_loop()
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
    self._create_dag_runs(query.all(), session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
    dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table

After this the scheduler gets terminated which is expected. When I check the table manually after this error, I am able to see the DAG entry in it.

This issue is not reproducible all the time. What can be the probable cause for this? Is there any Airflow configuration which I should try tweaking?

3
Tough to say without seeing your DAG - kaxil
Does the DAG structure really matter here? On restarting the scheduler, I can run the same dynamically spawned DAG. I wanted to know if this has something to do with concurrency between DAG parsing and scheduling. As I can see the DAG entry in the serialized_dag table manually, is it possible that this entry was added later by the DagFileProcessor in the table after scheduler failed to find the DAG entry in it? - Divyansh Jamuaar
As a quick fix, you can run airflow DB init once again, I think it fixes the DB state and the scheduler starts running. - ofnowhere

3 Answers

3
votes

I fixed this issue in https://github.com/apache/airflow/pull/13893 which will be released as part for Airflow 2.0.1.

Will release Airflow 2.0.1 next week (8 Feb 2021 - most likely).

5
votes

We had the same issue after updating in the following order:

  1. 1.10.12 -> 1.10.14
  2. 1.10.14 -> 2.0.0

I've followed their guide through, and we had no issues until at some random point after a few hours scheduler started crashing complaining about random DAGs not being found in the database.

Our deployment procedure involves clearing out /opt/airflow/dags folder and doing a clean install every time (we store dags and supporting code in python packages)

So every now and then on 1.10.x version we had cases when scheduler parsed an empty folder and wiped serialized dags from the database, but it always was able to restore the picture on next parse

Apparently in 2.0, as a part of the effort to make scheduler HA, they fully separated DAG processor and scheduler. Which leads to a race condition:

  • if scheduler job hits a database before DAG processor has updated serialized_dag table values, it finds nothing and crashes
  • if luck is on your side, the above will not happen and you won't see this exception

In order to get rid of this problem, I disabled scheduling of all DAGs by updating is_paused in the database, restarted the scheduler and once it generated serialized dags, turned all dags back ON

0
votes

Not enough rep to comment so having to leave an answer, but:

  1. is this a clean 2.0 install or an upgrade of your old 1.10.x instance? and
  2. are you recycling the names?

I literally just hit this problem (I found this question googling to see who else was in the same boat).

In my case, it's an upgraded existing 1.10.x install, and although the dags were generated dynamically, the names were recycled. I was getting errors clicking on the dag in the GUI and it was killing the scheduler.

Turns Out(TM), deleting the dags entirely using the 'trashcan' button in the GUI overview and letting them regenerate fixed it (as in, the problem immediately went away and hasn't recurred in the last 30 minutes).

At a guess, it smells to me like maybe some aspect of the dynamic dags wasn't properly migrated in the db upgrade step, and wiping them out and letting them fully regenerate fixed the problem. Obviously, you lose all your history etc, but (in my case at least) that's not necessarily a big deal.