Similar to the comment on your question, the way I solved this as a work around when I was backfilling a large database was to have a dag generator create three dags (two backfill and one ongoing) based on connection_created_on and start_date values.
The ongoing dag runs hourly and begins at midnight the same day as the connection_created_on value. The two backfills then pull daily starting on the first of the current month and then monthly starting with the first month of the start_date. In this case, I knew that we would always want to start on the first of the month and that data up to a month in scope was small enough to be pulled together so I split it up into these three dag types for expediency.
def create_dag(dag_id,
schedule,
db_conn_id,
default_args,
catchup=False,
max_active_runs=3):
dag = DAG(dag_id,
default_args=default_args,
schedule_interval=schedule,
catchup=catchup,
max_active_runs=max_active_runs
)
with dag:
kick_off_dag = DummyOperator(task_id='kick_off_dag')
return dag
db_conn_id = 'my_first_db_conn'
connection_created_on = '2018-05-17 12:30:54.271Z'
hourly_id = '{}_to_redshift_hourly'.format(db_conn_id)
daily_id = '{}_to_redshift_daily_backfill'.format(db_conn_id)
monthly_id = '{}_to_redshift_monthly_backfill'.format(db_conn_id)
start_date = '2005-01-01 00:00:00.000Z'
start_date = datetime.strptime(start_date, '%Y-%m-%dT%H:%M:%S.%fZ')
start_date = datetime(start_date.year, start_date.month, 1)
cco_datetime = datetime.strptime(connection_created_on, '%Y-%m-%dT%H:%M:%S.%fZ')
hourly_start_date = datetime(cco_datetime.year, cco_datetime.month, cco_datetime.day)
daily_start_date = hourly_start_date - timedelta(days=(cco_datetime.day-1))
daily_end_date = hourly_start_date - timedelta(days=1)
monthly_start_date = start_date if start_date else hourly_start_date - timedelta(days=365+cco_datetime.day-1)
monthly_end_date = daily_start_date
globals()[hourly_id] = create_dag(hourly_id,
'@hourly',
db_conn_id,
{'start_date': hourly_start_date,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email': [],
'email_on_failure': True,
'email_on_retry': False},
catchup=True,
max_active_runs=1)
globals()[daily_id] = create_dag(daily_id,
'@daily',
db_conn_id,
{'start_date': daily_start_date,
'end_date': daily_end_date,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email': [],
'email_on_failure': True,
'email_on_retry': False},
catchup=True)
globals()[monthly_id] = create_dag(monthly_id,
'@monthly',
db_conn_id,
{'start_date': monthly_start_date,
'end_date': monthly_end_date,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email': [],
'email_on_failure': True,
'email_on_retry': False},
catchup=True)
catchup=Trueon the DAG fixes this. - totooooo