1
votes

I am new to Airflow and created my first DAG. Here is my DAG code. I want the DAG to start now and thereafter run once in a day.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'alamode', default_args=default_args, schedule_interval=timedelta(1))

create_command = "/home/ubuntu/scripts/makedir.sh "

# t1 is the task which will invoke the directory creation shell script
t1 = BashOperator(
    task_id='create_directory',
    bash_command=create_command,
    dag=dag)

run_spiders = "/home/ubuntu/scripts/crawl_spiders.sh "
# t2 is the task which will invoke the spiders
t2 = BashOperator(
    task_id='web_scrawl',
    bash_command=run_spiders,
    dag=dag)

# To set dependency between tasks. 't1' should run before t2
t2.set_upstream(t1)

The DAG is not getting picked by Airflow. I checked the log and here is what it says.

[2017-09-12 18:08:20,220] {jobs.py:343} DagFileProcessor398 INFO - Started process (PID=7001) to work on /home/ubuntu/airflow/dags/alamode.py
[2017-09-12 18:08:20,223] {jobs.py:1521} DagFileProcessor398 INFO - Processing file /home/ubuntu/airflow/dags/alamode.py for tasks to queue
[2017-09-12 18:08:20,223] {models.py:167} DagFileProcessor398 INFO - Filling up the DagBag from /home/ubuntu/airflow/dags/alamode.py
[2017-09-12 18:08:20,262] {jobs.py:1535} DagFileProcessor398 INFO - DAG(s) ['alamode'] retrieved from /home/ubuntu/airflow/dags/alamode.py
[2017-09-12 18:08:20,291] {jobs.py:1169} DagFileProcessor398 INFO - Processing alamode
/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)
[2017-09-12 18:08:20,317] {models.py:322} DagFileProcessor398 INFO - Finding 'running' jobs without a recent heartbeat
[2017-09-12 18:08:20,318] {models.py:328} DagFileProcessor398 INFO - Failing jobs without heartbeat after 2017-09-12 18:03:20.318105
[2017-09-12 18:08:20,320] {jobs.py:351} DagFileProcessor398 INFO - Processing /home/ubuntu/airflow/dags/alamode.py took 0.100 seconds

What exactly am I doing wrong? I have tried changing the schedule_interval to schedule_interval=timedelta(minutes=1) to see if it starts immediately, but still no use. I can see the tasks under the DAG as expected in Airflow UI but with schedule status as 'no status'. Please help me here.

1
Have you turned on the DAG switch in UI to on?Chengzhi
Yes, the button is on. Still, it is not getting picked up.Anju
Have you invoked airflow worker and airflow scheduler?Vinod Vutpala
Issue resolved by following these steps: 1) I used a much older date for start_date and schedule_interval=timedelta(minutes=10). 2) Added catchup = True in DAG arguments 3) Setup environment variable as export AIRFLOW_HOME=pwd/airflow_home 4) Deleted airflow.db 5) Moved the new code to DAGS folder 6) Ran the command 'airflow initdb' 7) Turned on my DAG through UIAnju
@Anju, glad you figure out, also, if you are using version 1.8+, you might want to put constant start date instead of using datetime.now(), here is the reference: github.com/apache/incubator-airflow/blob/master/…Chengzhi

1 Answers

2
votes

This issue has been resolved by following the below steps:

1) I used a much older date for start_date and schedule_interval=timedelta(minutes=10). Also, used a real date instead of datetime.now().
2) Added catchup = True in DAG arguments.
3) Setup environment variable as export AIRFLOW_HOME=pwd/airflow_home.
4) Deleted airflow.db
5) Moved the new code to DAGS folder
6) Ran the command 'airflow initdb' to create the DB again.
7) Turned the 'ON' switch of my DAG through UI
8) Ran the command 'airflow scheduler'

Here is the code which works now:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 9, 12),
    'email': ['[email protected]'],
    'retries': 0,
    'retry_delay': timedelta(minutes=15)
}

dag = DAG(
    'alamode', catchup=False, default_args=default_args, schedule_interval="@daily")

# t1 is the task which will invoke the directory creation shell script
t1 = BashOperator(
    task_id='create_directory',
    bash_command='/home/ubuntu/scripts/makedir.sh ',
    dag=dag)


# t2 is the task which will invoke the spiders
t2 = BashOperator(
    task_id= 'web_crawl',
    bash_command='/home/ubuntu/scripts/crawl_spiders.sh ',
    dag=dag)

# To set dependency between tasks. 't1' should run before t2
t2.set_upstream(t1)