1
votes

I'm trying to use Hdfs sensor operator in Airflow to trigger the next tasks based on the file arrival in the given path. But, when I deploy the dag, getting the error as

Broken DAG: [/usr/local/airflow/dags/test_sensor_dag.py] invalid syntax (client.py, line 1473) Code:

from airflow import DAG
from airflow.sensors.hdfs_sensor import HdfsSensor
from datetime import datetime


default_args = {'owner': 'airflow',
                'depends_on_past': False,
                'provide_context': True,
                'start_date': datetime(2020, 3, 19, 0, 0),
                'email': ['hr@***.com'],
                'email_on_failure': True,
                'email_on_retry': False,
                'retries': 0,
                'concurrency': 1
                }

# run it daily at 6AM
schedule_interval = '00 6 * * *'

dag_name = 'test_sensor_dag'

dag = DAG(
    dag_id=dag_name,
    default_args=default_args,
    schedule_interval=schedule_interval)

source_data_sensor = HdfsSensor(
    task_id='source_data_sensor',
    filepath='/data/test/file.csv',
    poke_interval=10,
    timeout=5,
    dag=dag
).poke()

success_notification = EmailOperator(to=['hr@***.com'], task_id='success_notification',
                                     subject='[Success:] test for {{ ds }}',
                                     html_content='Successfully ran the DAG',
                                     dag=dag)

source_data_sensor
success_notification.set_upstream(source_data_sensor)
3

3 Answers

0
votes

You don't need to call .poke() in your code, airflow executes the poke function at runtime

0
votes

Running the following sequence of commands worked for me while installing airflow:

   pip install snakebite-py3
   pip install azure-storage>=0.34.0,<0.37.0
   pip install 'apache-airflow[all]'

UPDATE: Better still, if youre not using azure just install the packages needed from airflow packages. However, installing snakebite is a must

0
votes

Your problem isnt poke or things like that, if you have your hdfs provider installed correctly just make your dag simpler(only put your hdfs sensor task and remove email stuff) and run it again, then provide more of your error.

FYI: client.py as you know, is related to HTTP and XMLRPC stuff, so you can simply check it in your cnf file and make sure you have provided correct config in correct form and syntax and run it again. also make sure you are following best practices to avoid bad imports.