2
votes

I have local Python package which I want to use inside of the Apache Beam pipeline with DataFlow Runner. I tried to follow instructions provided in the documentation: https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ (section Local or non-PyPI Dependencies) but without success.

My package has the following structure:

my_common
├── __init__.py
└── shared
    ├── __init__.py
    └── something.py

With something.py file containing:

def hello_world():
    return "Hello"

Package was build using python setup.py sdist command.

Now, I have Apache Beam pipeline configured as follows:

pipeline_parameters = [
    '--project', project_id,
    '--staging_location', staging_location,
    '--temp_location', temp_location,
    '--max_num_workers', 1,
    "--extra_package", "/absolute/path/to/my/package/my_common-1.0.tar.gz"
]


p = beam.Pipeline("DataFlowRunner", argv=pipeline_parameters)
# rest of the pipeline definition

One of the pipeline map functions has the following code, which uses my module:

from my_common.shared import something
logging.info(something.hello_world())

Every time when I schedule this pipeline to DataFlow, I get the following error:

ImportError: No module named shared

Interesting part is that when I install this package (from .tar.gz) file on another environment, I can import and run function from it without any problems. It seems to me that DataFlow does not install the package before running the pipeline.

What is the proper way of managing and deploying local Python dependencies to Google DataFlow?

//update: The solution described in https://stackoverflow.com/a/46605344/1955346 is not sufficient for my use case, since I need to have my local packages in completely different folder and setup.py for my pipeline already has some contents (I cannot use setup.py of external package as suggested there).

1

1 Answers

2
votes

Instead of providing it through extra-packages provide it using setup_file

Use setuptools to define your setup_file, it will look somewhat as follows

from setuptools import setup

setup(
    name="dataflow_pipeline_dependencies",
    version="1.0.0",
    author="Marcin Zablocki",
    author_email="[email protected]",
    description=("Custom python utils needed for dataflow cloud runner"),
    packages=[
        'my_common'
        ]
)

and pass it using --setup_file parameter as follows

pipeline_parameters = [
    '--project', project_id,
    '--staging_location', staging_location,
    '--temp_location', temp_location,
    '--max_num_workers', 1,
    "--setup_file", "/absolute/path/to/your/package/my_common"
]


p = beam.Pipeline("DataFlowRunner", argv=pipeline_parameters)
# rest of the pipeline definition

where /absolute/path/to/your/package/my_common is path to the directory where the package is stored