I have a working Dataflow pipeline the first runs setup.py to install some local helper modules. I now want to use Cloud Composer/Apache Airflow to schedule the pipeline. I've created my DAG file and placed it in the designated Google Storage DAG folder along with my pipeline project. The folder structure looks like this:
{Composer-Bucket}/
dags/
--DAG.py
Pipeline-Project/
--Pipeline.py
--setup.py
Module1/
--__init__.py
Module2/
--__init__.py
Module3/
--__init__.py
The part of my DAG that specifies the setup.py file looks like this:
resumeparserop = dataflow_operator.DataFlowPythonOperator(
task_id="resumeparsertask",
py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
dataflow_default_options={
"project": {PROJECT-NAME},
"setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})
However, when I look at the logs in the Airflow Web UI, I get the error:
RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.
I am not sure why it is unable to find the setup file. How can I run my Dataflow pipeline with the setup file/modules?