I have a airflow script that tries to insert data from one table to another, I am using a Amazon Redshift DB. The given below script when triggered does not execute. Task_id status remains as 'no status' in the Graph view and no other error is shown.
## Third party Library Imports
import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io
# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 1, 23, 12),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")
#######################
## Login to DB
def db_login():
global db_conn
try:
db_conn = psycopg2.connect(
" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439' sslmode = 'require' ")
except:
print("I am unable to connect to the database.")
print('Connection Task Complete: Connected to DB')
return (db_conn)
#######################
def insert_data():
cur = db_conn.cursor()
cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2 ;""")
db_conn.commit()
print('ETL Task Complete')
def job_run():
db_login()
insert_data()
##########################################
t1 = PythonOperator(
task_id='DBConnect',
python_callable=job_run,
bash_command='python3 ~/airflow/dags/sample.py',
dag=dag)
t1
Could anyone assist to find where the problem could be. Thanks
Updated Code (05/28)
## Third party Library Imports
import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
#from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io
# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 23, 12),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")
#######################
## Login to DB
def data_warehouse_login():
global dwh_connection
try:
dwh_connection = psycopg2.connect(
" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = 'port' sslmode = 'require' ")
except:
print("Connection Failed.")
print('Connected successfully')
return (dwh_connection)
def insert_data():
cur = dwh_connection.cursor()
cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")
dwh_connection.commit()
print('Task Complete: Insert success')
def job_run():
data_warehouse_login()
insert_data()
##########################################
t1 = PythonOperator(
task_id='DWH_Connect',
python_callable=job_run(),
# bash_command='python3 ~/airflow/dags/sample.py',
dag=dag)
t1
Log message when running the script
[2018-05-28 11:36:45,300] {jobs.py:343} DagFileProcessor26 INFO - Started process (PID=26489) to work on /Users/user/airflow/dags/sample.py
[2018-05-28 11:36:45,306] {jobs.py:534} DagFileProcessor26 ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1
[2018-05-28 11:36:45,310] {jobs.py:1521} DagFileProcessor26 INFO - Processing file /Users/user/airflow/dags/sample.py for tasks to queue
[2018-05-28 11:36:45,310] {models.py:167} DagFileProcessor26 INFO - Filling up the DagBag from /Users/user/airflow/dags/sample.py
/Users/user/anaconda3/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
""")
Task Complete: Insert success
[2018-05-28 11:36:50,964] {jobs.py:1535} DagFileProcessor26 INFO - DAG(s) dict_keys(['latest_only', 'example_python_operator', 'test_utils', 'example_bash_operator', 'example_short_circuit_operator', 'example_branch_operator', 'tutorial', 'example_passing_params_via_test_command', 'latest_only_with_trigger', 'example_xcom', 'example_http_operator', 'example_skip_dag', 'example_trigger_target_dag', 'example_branch_dop_operator_v3', 'example_subdag_operator', 'example_subdag_operator.section-1', 'example_subdag_operator.section-2', 'example_trigger_controller_dag', 'insert_data2']) retrieved from /Users/user/airflow/dags/sample.py
[2018-05-28 11:36:51,159] {jobs.py:1169} DagFileProcessor26 INFO - Processing example_subdag_operator
[2018-05-28 11:36:51,167] {jobs.py:566} DagFileProcessor26 INFO - Skipping SLA check for <DAG: example_subdag_operator> because no tasks in DAG have SLAs
[2018-05-28 11:36:51,170] {jobs.py:1169} DagFileProcessor26 INFO - Processing sample_dag
[2018-05-28 11:36:51,174] {jobs.py:354} DagFileProcessor26 ERROR - Got an exception! Propagating...
Traceback (most recent call last):
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 346, in helper
pickle_dags)
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1581, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1171, in _process_dags
dag_run = self.create_dag_run(dag)
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 776, in create_dag_run
if next_start <= now:
TypeError: '<=' not supported between instances of 'NoneType' and 'datetime.datetime'
Log from the Graph View
* Log file isn't local. * Fetching here: http://:8793/log/sample_dag/DWH_Connect/2018-05-28T12:23:57.595234 *** Failed to fetch log file from worker.
* Reading remote logs... * Unsupported remote log location.

bash_command='python3 ~/airflow/dags/sample.py'in PythonOperator? I'd suggest you runpython your_dag_file.pyto figure out any compile error? - Chengzhi