0
votes

I am new in python and airflow dag. I am following below link and code which is mention in answer section.
How to pass dynamic arguments Airflow operator?

I am facing issue to reading yaml file, In yaml file I have some configuration related arguments.

configs:
    cluster_name: "test-cluster"
    project_id: "t***********"
    zone: "europe-west1-c"
    num_workers: 2
    worker_machine_type: "n1-standard-1"
    master_machine_type: "n1-standard-1"

In DAG script I have created one task which will be create cluster, before executing this task we need all the arguments which we need to pass on it default_args parameter like cluster-name, project_id etc.For reading those parameter I have created one readYML method.see below code

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from zipfile import ZipFile
from airflow.contrib.operators import dataproc_operator

from airflow.models import Variable
import yaml

def readYML():
     print("inside readYML")
     global cfg
     file_name = "/home/airflow/gcs/data/cluster_config.yml"
     with open(file_name, 'r') as ymlfile:
          cfg = yaml.load(ymlfile)
     print(cfg['configs']['cluster_name'])

 # Default Arguments
 readYML()

 dag_name = Variable.get("dag_name")

  default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
     'start_date': datetime.now(),
     'email': ['[email protected]'],
     'email_on_failure': False,
     'email_on_retry': False,
     'retries': 1,
     'retry_delay': timedelta(minutes=5),
     #'cluster_name': cfg['configs']['cluster_name'],    
    }

    # Instantiate a DAG

    dag = DAG(dag_id='read_yml', default_args=default_args, 
    schedule_interval=timedelta(days=1))

    # Creating Tasks
    Task1 = DataprocClusterCreateOperator(
    task_id='create_cluster',
    dag=dag
    )

In this code there is no error, When I am uploading in GCP composer environment, No error notification is showing but this DAG is no runnable there is no Run button is coming.

See attached screen shot. I am using python 3 & airflow composer-1.7.2-airflow-1.10.2 version.enter image description here

1
this usually indicates you have issue with your code that airflow cannot parse it. If you have an airflow environment setup, I suggest you try python your_dag.py and see if it gave you any error message. One thing I can see already is dag=DAG should be dag=dag - Chengzhi
@Chengzhi No,Here I have done mistake in copy paste, actually creating task I have created in different python file, so from there I copied the code, anyway this kind of issue we can see in the notification alert in red color, you can see in above screenshot, there is no error. So this is not the issue I already checked - Bhagesh Arora
Can you verify if the airflow scheduler is running as well? - Chengzhi
Yeah, It's running fine - Bhagesh Arora
Of course there is an error in the code, your line default_args = { is indented once space to deep. Apart from that YML](fdik.org/yml) is not the same as YAML and the recommended extension for YAML files has been .yaml at least since Sep 2006. So you should constently use YAML and .yaml in your code. - Anthon

1 Answers

2
votes

According to the Data Stored in Cloud Storage page in the Cloud Composer docs:

To avoid a webserver error, make sure that data the webserver needs to parse a DAG (not run) is available in the dags/ folder. Otherwise, the webserver can't access the data or load the Airflow web interface.

Your DAG is attempting to open the YAML file under /home/airflow/gcs/data, which isn't present on the webserver. Put the file under the dags/ folder in your GCS bucket, and it will be accessible to the scheduler, workers, and webserver, and the DAG will work in the Web UI.