0
votes

Similar to previous questions, but none of the answers given worked. I have a DAG:

import datetime
import os

from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.operators import BashOperator

PROJECT = os.environ['PROJECT']
GCS_BUCKET = os.environ['BUCKET']
API_KEY = os.environ['API_KEY']

default_args = {
    'owner': 'me',
    'start_date': datetime.datetime(2019, 7, 30),
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': datetime.timedelta(hours=1),
    'catchup': False
}

dag = DAG('dag-name',
          schedule_interval=datetime.timedelta(hours=2),
          default_args=default_args,
          max_active_runs=1,
          concurrency=1,
          catchup=False)

DEFAULT_OPTIONS_TEMPLATE = {
    'project': PROJECT,
    'stagingLocation': 'gs://{}/staging'.format(GCS_BUCKET),
    'tempLocation': 'gs://{}/temp'.format(GCS_BUCKET)
}

def my-dataflow-job(template_location, name):
    run_time = datetime.datetime.utcnow()
    a_value = run_time.strftime('%Y%m%d%H')

    t1 = DataflowTemplateOperator(
        task_id='{}-task'.format(name),
        template=template_location,
        parameters={'an_argument': a_value},
        dataflow_default_options=DEFAULT_OPTIONS_TEMPLATE,
        poll_sleep=30
    )

    t2 = BashOperator(
        task_id='{}-loader-heartbeat'.format(name),
        bash_command='curl --fail -XGET "[a heartbeat URL]" --header "Authorization: heartbeat_service {1}"'.format(name, API_KEY)
    )

    t1 >> t2

with dag:
    backup_bt_to_bq('gs://[path to gcs]'.format(GCS_BUCKET), 'name')

As you can see, I'm trying very hard to prevent Airflow from trying to backfill. Yet, when I deploy the DAG (late in the day, on 7/30/2019), it just keeps running the DAG one after the other, after the other, after the other.

Since this task is moving a bit of data around, this is not desirable. How do I get airflow to respect the "run this every other hour" schedule_interval??

As you can see, I've set catchup: False in both the DAG args AND the default args (just in case, started with them in the DAG args). The retry delay is also a long period.

Each DAG run is reported as a success. I'm running with the following version:

composer-1.5.0-airflow-1.10.1

My next step is kubernetes cron...

1

1 Answers

0
votes

I suspect you did not have catchup=False when you first created the dag. I think airflow may not recognize changes to the catchup parameter after inital dag creation.

Try renaming it and see what happens. E.g. add a v2 and enable it. After enabling it, it will run once even though catchup is false, because there is a valid completed interval (i.e. current time is >= start_time + schedule_interval), but that is all.

Of course, test with a fake operator that doesn't do anything expensive.