2
votes

I have a working Dataflow pipeline the first runs setup.py to install some local helper modules. I now want to use Cloud Composer/Apache Airflow to schedule the pipeline. I've created my DAG file and placed it in the designated Google Storage DAG folder along with my pipeline project. The folder structure looks like this:

{Composer-Bucket}/
    dags/
       --DAG.py
       Pipeline-Project/
           --Pipeline.py
           --setup.py
           Module1/
              --__init__.py
           Module2/
              --__init__.py
           Module3/
              --__init__.py

The part of my DAG that specifies the setup.py file looks like this:

resumeparserop = dataflow_operator.DataFlowPythonOperator(
    task_id="resumeparsertask",
    py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
    dataflow_default_options={
        "project": {PROJECT-NAME},    
        "setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})

However, when I look at the logs in the Airflow Web UI, I get the error:

RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.

I am not sure why it is unable to find the setup file. How can I run my Dataflow pipeline with the setup file/modules?

2

2 Answers

4
votes

If you look at the code for DataflowPythonOperator it looks like the main py_file can be a file inside of a GCS bucket and is localized by the operator prior to executing the pipeline. However, I do not see anything like that for the dataflow_default_options. It appears that the options are simply copied and formatted.

Since the GCS dag folder is mounted on the Airflow instances using Cloud Storage Fuse you should be able to access the file locally using the "dags_folder" env var. i.e. you could do something like this:

from airflow import configuration
....
LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'Pipeline-Project', 'setup.py')

You can then use the LOCAL_SETUP_FILE variable for the setup_file property in the dataflow_default_options.

0
votes

Do you run Composer and Dataflow with the same service account, or are they separate? In the latter case, have you checked whether Dataflow's service account has read access to the bucket and object?