1
votes

I have to schedule a DAG that should run on 15th of every month. However, if 15th falls on a Sunday/Saturday then the DAG should skip weekends and run on coming Monday.

For example, May 15 2021 falls on a Saturday. So, instead of running on 15th of May, the DAG should run on 17th, which is Monday.

Can you please help to schedule it in airflow?

Thanks in advance!

2

2 Answers

0
votes

The logic of scheduling is limited by what you can do with single cron expression. So if you can't say it in cron expression you can't provide such scheduling in Airflow. For that reason there is an open airflow improvement proposal AIP-39 Richer scheduler_interval to give more scheduling capabilities.

That said, you can still get the desired functionality by writing some code. You can set your dag to start on the 15th of each month and then place a sensor that verify that the date is Mon-Fri (if not it will wait):

from airflow.sensors.weekday import DayOfWeekSensor
dag = DAG(
    dag_id='work',
    schedule_interval='0 0 15 * *',
    default_args=default_args,
    description='Schedule a Job on 15 of each month',
)

weekend_check = DayOfWeekSensor(
    task_id='weekday_check_task',
    week_day={'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'},
    mode='reschedule',
    dag=dag)

op_1 = YourOperator(task_id='op1_task',dag=dag)

weekend_check >> op_1

Note: If you are running airflow<2.0.0 you will need to change the import to:

from airflow.contrib.sensors.weekday_sensor import DayOfWeekSensor
0
votes

The answer posted by Elad works pretty well. I came up with another solution that works as well.

I scheduled the job to run on 15,16 and 17 of the month. However, I added a condition so that the job runs on the 15th if its a weekday. The job runs on 16th and 17th if its a Monday.

To achieve that, I added a BranchPythonOperator:

from airflow.operators.python_operator import BranchPythonOperator

def _conditinal_task_initiator(**kwargs):
execution_date=kwargs['execution_date']
if int(datetime.strftime(execution_date,'%d'))==15 and (execution_date.weekday()<5):
    return 'dummy_task_run_cmo_longit'
elif int(datetime.strftime(execution_date,'%d'))==16 and (execution_date.weekday()==0):
    return 'dummy_task_run_cmo_longit'
elif int(datetime.strftime(execution_date,'%d'))==17 and (execution_date.weekday()==0):
    return 'dummy_task_run_cmo_longit'
else:
    return 'dummy_task_skip_cmo_longit'

with DAG(dag_id='NXS_FM_LOAD_CMO_CHOICE_LONGIT',default_args = default_args, schedule_interval = "0 8 15-17 * *", catchup=False) as dag:
conditinal_task_initiator=BranchPythonOperator(
    task_id='cond_task_check_day',
    provide_context=True,
    python_callable=_conditinal_task_initiator,
    do_xcom_push=False)
dummy_task_run_cmo_longit=DummyOperator(
    task_id='dummy_task_run_cmo_longit')
dummy_task_skip_cmo_longit=DummyOperator(
    task_id='dummy_task_skip_cmo_longit')

conditinal_task_initiator >> [dummy_task_run_cmo_longit,dummy_task_skip_cmo_longit]

dummy_task_run_cmo_longit >> <main tasks for execution>

Using this, the job'll run on 15,16 and 17 of every month. However, it'll run the functional tasks only once every month.