1
votes

Was attempting to add a new DAG to our Google Cloud Composer instance - we have 32+ DAGs currently - and doing the usual things in https://cloud.google.com/composer/docs/how-to/using/managing-dags doesn't appear to be having any effect - we can't see these DAGs in the webserver/UI and I don't see that they are necessarily being loaded. I do see them being copied to the appropriate bucket in the logs but nothing beyond that.

I even tried setting a dummy environment variable to kick off a full restart of the Composer instance but to no avail.

Finally I've put together an entirely stripped down DAG and attempted to add it. Here is the DAG:

from airflow import models
from airflow.contrib.operators import kubernetes_pod_operator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator

dag = models.Dag(
    dag_id="test-dag",
    schedule_interval=None,
    start_date=datetime(2020, 3, 9),
    max_active_runs=1,
    catchup=False,
)

task_test = DummyOperator(dag=dag, task_id="test-task")

Even this simple DAG isn't getting picked up so I'm wondering what I can try next. I looked through https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg in an effort to see if perhaps there was anything I might tweak in here in terms of DagBag loading time limits, etc. but nothing jumps off. Totally stumped here.

2

2 Answers

2
votes

Your example is not picked up by my environment either. However, I've tried with the following format and was picked up without issues:

from airflow import DAG
from datetime import datetime
from airflow.contrib.operators import kubernetes_pod_operator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator

with DAG(
    "my-test-dag",
    schedule_interval=None,
    start_date=datetime(2020, 3, 9),
    max_active_runs=1,
    catchup=False) as dag:

        task_test = DummyOperator(dag=dag, task_id="my-test-task")
-1
votes

We eventually figured out the issue:

dag = models.Dag(

should be

dag = models.DAG(