2
votes

I have a Python script that points to an elasticsearch cluster, performs aggregations and computations on data, and then stores insights in a local PostgreSQL. The script can be run daily or weekly depending on the user preferences as follows:

python script.py --approach daily 
python script.py --approach weekly

I want to automate this dataflow workflow process to be run every 10 minutes via Airflow.

My guess is to go for the bashoperator as to create a task t1 = bashoperator that executes the bash command python script.py --approach daily as a DAG1, and t2 = bashoperator that executes the bash command python script.py --approach weekly

The code didn't seem to give an appropriate result as the webUI of Airflow seems to schedule all jobs to scheduled.

Can anyone tell me what I have been doing wrong?


#imports 
    from airflow.models import DAG
    from airflow.operators.bash_operator import BashOperator
    from  airflow.operators.python_operator import PythonOperator
    from  airflow.operators.email_operator import EmailOperator

    from datetime import datetime, timedelta

    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                        datetime.min.time())

    default_args = {
        
        'owner': 'me',
        'depends_on_past': False,
        'start_date': seven_days_ago,
        'email': ['[email protected]'],
        'email_on_failure': True,
        'email_on_retry': False
        'retries': 3,
        'max_tries' : 3 , 
        'retry_delay': timedelta(minutes=10) 

        }

    etl_dag = DAG('tester',default_args=default_args,schedule_interval= '@once')

    #the bashoperator to execute the bash command as to automate the task execution every 5 min 
    weekly_task = BashOperator(
    task_id='testing',
    bash_command='python  my_script.py --approach weekly',
    dag=etl_dag)

2
It's not clear what you want to do. You talk about executing script daily & weekly but you also talk about running it every 5/10 min. Can you please clarify what needs to be executed and when?Elad
the goal is to automate the execution daily & weekly . I suggested 5 minutes just to see the logs and be sure that it works properly rather than waiting a whole day to be triggered ... but to clarify , what I want is the script to be automated daily and weekly . any ideas how ? thank you very muchHoussem Bzi
NOTE : I can reformulate my question if you find it best to do so .Houssem Bzi
What did you try to achieve by setting datetime.combine in start_date?Elad

2 Answers

0
votes

You have several approaches here:

  1. Write two DAGs. One for daily and one for weekly.
  2. Write one DAG using the DayOfWeekBranchOpeator that will branch your workflow depending on the specific day. For example: On Monday it will perform --approach weekly and in all other days it will perform --approach daily.

If you are running Airflow>= 2.1.0 (not yet released):

from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.operators.bash import BashOperator

with DAG(
    dag_id='tester',
    default_args=default_args,
    schedule_interval='@daily'
) as dag:

    branch_op = BranchDayOfWeekOperator(
        task_id="branch_task",
        follow_task_ids_if_true="weekly_task",
        follow_task_ids_if_false="daily_task",
        week_day="MONDAY", # Replace with the day you want to execute the approch weekly
        use_task_execution_day=False, # Set true if you want the day to be checked against execution_date
    )
    weekly_op = BashOperator(
        task_id='weekly_task',
        bash_command='python  my_script.py --approach weekly',
    )
    daily_op = BashOperator(
        task_id='daily_task',
        bash_command='python  my_script.py --approach daily',
    )

    branch_op >> [weekly_op, daily_op]

If you are running Airflow< 2.1.0:

Copy the DayOfWeekBranchOpeator code into your project, import it locally and use the same code as above. The DayOfWeekBranchOpeator is new in Airflow 2.1 release. Note that you might need to change a few imports in the operator depends which Airflow version you are running.

0
votes

Based on your syntax, I'm going to assume you are running Airflow 1.10.14.

I think you are trying to accomplish the following:

  • run my_script.py daily
  • run my_script.py weekly

You will need to alter your script to do exactly one workflow cycle. This is because Airflow will be executing your script each schedule interval. If your script is ran and has an internal scheduling process (assuming that's what --approach determines), it will conflict with Airflow core purpose, which is to manage your workflows. Each time Airflow runs the script, it'll start another process that runs weekly (for --approach weekly) multiplying the workflows.

You want Airflow to schedule your workflow, not the script.


Two DAGs with separate schedule_interval will work here.

daily_dag.py

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

from datetime import datetime

dag = DAG(
    dag_id='daily_dag',
    start_date=datetime(2021, 1, 1),
    schedule_interval='0 7 * * *'
)
with dag:
    run_script = BashOperator(
        task_id='run_script',
        bash_command='python my_script.py',
    )

This DAG will run the workflow every day at 7 UTC.

weekly_run.py

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

from datetime import datetime

dag = DAG(
    dag_id='weekly_dag',
    start_date=datetime(2021, 1, 1),
    schedule_interval='0 7 * * Mon'
)
with dag:
    run_script = BashOperator(
        task_id='run_script',
        bash_command='python my_script.py',
    )

This DAG will run the workflow every Monday at 7 UTC.

Keeping them separate allows you to have a cleaner pipeline as well as clearer definition of what each pipeline does at their own frequencies.

I would also look into converting those BashOperator into PythonOperators to remove a layer of abstraction by executing the python code directly vs through a shell.