I have a Python script that points to an elasticsearch cluster, performs aggregations and computations on data, and then stores insights in a local PostgreSQL. The script can be run daily or weekly depending on the user preferences as follows:
python script.py --approach daily
python script.py --approach weekly
I want to automate this dataflow workflow process to be run every 10 minutes via Airflow.
My guess is to go for the bashoperator as to create a task t1 = bashoperator
that executes the bash command python script.py --approach daily
as a DAG1, and t2 = bashoperator
that executes the bash command python script.py --approach weekly
The code didn't seem to give an appropriate result as the webUI of Airflow seems to schedule all jobs to scheduled.
Can anyone tell me what I have been doing wrong?
#imports
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': seven_days_ago,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False
'retries': 3,
'max_tries' : 3 ,
'retry_delay': timedelta(minutes=10)
}
etl_dag = DAG('tester',default_args=default_args,schedule_interval= '@once')
#the bashoperator to execute the bash command as to automate the task execution every 5 min
weekly_task = BashOperator(
task_id='testing',
bash_command='python my_script.py --approach weekly',
dag=etl_dag)