5
votes

After a lengthy search, I haven't found an example of a Dataflow / Beam pipeline that spans several files. Beam docs do suggest a file structure (under the section "Multiple File Dependencies"), but the Juliaset example they give has in effect a single code/source file (and the main file that calls it). Based on the Juliaset example, I need a similar file structure:

juliaset/__init__.py
juliaset/juliaset.py # actual code
juliaset/some_conf.py
__init__.py
juliaset_main.py
setup.py

Now I want to import .some_conf from juliaset/juliaset.py, which works when run locally but gives me an error when run on Dataflow

INFO:root:2017-12-15T17:34:09.333Z: JOB_MESSAGE_ERROR: (8cdf3e226105b90a): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 706, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 446, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 247, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 363, in load_session
    module = unpickler.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1133, in load_reduce
    value = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 767, in _import_module
    return getattr(__import__(module, None, None, [obj]), obj)
ImportError: No module named package_name.juliaset.some_conf

A full working example would be very much appreciated!

1
I am unclear about the question. You are asking for a working example. The Juliaset you are referring to is one. Or are you saying it isn't working for you? Are you calling it as per doc in the main file? (including --setup_file)de1
The Juliaset example works as is, but its source code is essentially still all in a single file (juliaset/juliaset.py). I would like to split the program across several source files, such as juliaset/src1.py and juliaset/src2.py. Also, the juliaset example is embedded inside the apache_beam package (so imports look like "from apache_beam.examples.complete.juliaset.juliaset import juliaset"). That's why I was asking for a full working example with many source files, preferably a "standalone" version that's not embedded in another project/package.Mattias Arro
@Mattias Arro How did you resolved it ?Aryan087
I didn't, really, just worked around it. I wanted to include mainly configuration and utility methods that I would use from many pipelines or other python modules on the system. Now I load the configuration from GCS, and at times duplicate code onto different pipelines - neither of which is ideal. I pulled some common code (like running precondition checks and pipeline code cleanup / file merging) out of Dataflow pipelines and into Airflow tasks, which orchestrates all my work (of which Dataflow is just one step) - this considerably reduced duplicate code between pipelines.Mattias Arro
Anyone resolve this? I am struggling with the same issue two years later ...Michael

1 Answers

2
votes

Can you verify your setup.py containing a structure like:

import setuptools

setuptools.setup(
    name='My Project',
    version='1.0',
    install_requires=[],
    packages=setuptools.find_packages(),
)

Import your modules like from juliaset.juliaset import SomeClass

And when you call the Python script, use python -m juliaset_main (without the .py)

Not sure if you already tried this, but just to be sure.