2
votes

We've been running a Python pipeline in datalab that reads image files from a bucket in google cloud storage (importing google.datalab.storage). Originally we were using DirectRunner and this worked fine, but now we're trying to use DataflowRunner, and we're having import errors. Even if we include "import google.datalab.storage" or any variant thereof inside the function run by the pipeline, we get errors such as "No module named 'datalab.storage'". We've also tried using the save_main_session, requirements_file, and setup_file flags with no luck. How would we correctly access image files in cloud storage buckets in a dataflow pipeline?

EDIT: My original error was due to specifying the requirements_file flag with incorrect syntax (i.e. "--requirements_file ./requirements.txt"). I think I've fixed the syntax there, but now I'm getting a different error. Here's a basic version of the code we're trying to run- we have a pipeline that reads files from a storage bucket in Google Cloud. We have a datalab notebook with a cell containing the following Python code:

import apache_beam as beam
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
from apache_beam.utils.pipeline_options import StandardOptions
import google.datalab.storage as storage

bucket = "BUCKET_NAME"
shared_bucket = storage.Bucket(bucket)

# Create and set PipelineOptions. 
options = PipelineOptions(flags = ["--requirements_file", "./requirements.txt"])
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "PROJECT_NAME"
google_cloud_options.job_name = 'test-pipeline-requirements'
google_cloud_options.staging_location = 'gs://BUCKET_NAME/binaries'
google_cloud_options.temp_location = 'gs://BUCKET_NAME/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

def read_file(input_tuple):
  filepath = input_tuple[0]
  shared_object = shared_bucket.object(filepath)
  f = shared_object.read_stream()
  # More processing of f's contents
  return input_tuple

# File paths relative to the bucket
input_tuples = [("FILEPATH_1", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()

Meanwhile there is a file named "requirements.txt" in the same directory as the notebook, with only the line

datalab==1.0.1

This code works fine if I use DirectRunner. However, when I use DataflowRunner, I get a CalledProcessError at "p.run()", with stack trace ending with the following:

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc in _populate_requirements_cache(requirements_file, cache_dir)
224 '--no-binary', ':all:']
225 logging.info('Executing command: %s', cmd_args)
--> 226 processes.check_call(cmd_args)
227
228

/usr/local/lib/python2.7/dist-packages/apache_beam/utils/processes.pyc in check_call(*args, **kwargs)
38 if force_shell:
39 kwargs['shell'] = True
---> 40 return subprocess.check_call(*args, **kwargs)
41
42

/usr/lib/python2.7/subprocess.pyc in check_call(*popenargs, **kwargs)
538 if cmd is None:
539 cmd = popenargs[0]
--> 540 raise CalledProcessError(retcode, cmd)
541 return 0
542

CalledProcessError: Command '['/usr/bin/python', '-m', 'pip', 'install', '--download', '/tmp/dataflow-requirements-cache', '-r', './requirements.txt', '--no-binary', ':all:']' returned non-zero exit status 1

It seems like the "--download" option is deprecated for pip, but that's part of the apache_beam code. I've also tried this with different ways of specifying "requirements.txt", with and without the "--save_main_session" flag, and with and without the "--setup_file" flag, but no dice.

2
Can you edit your question to include a Minimal, Complete, and Verifiable example as well as the exact error message you are getting when trying to run this code?Nef10

2 Answers

4
votes

If your only usage of pydatalab is to read from GCS, then I would suggest using Dataflow's gcsio. Code example:

def read_file(input_tuple):
  filepath = input_tuple[0]
  with beam.io.gcp.gcsio.GcsIO().open(filepath, 'r') as f:
    # process f content
    pass

# File paths relative to the bucket
input_tuples = [("gs://bucket/file.jpg", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()

pydatalab is pretty heavy since it is more of an data exploration library used with Datalab or Jupyter. On the other hand, Dataflow's GCSIO is natively supported in pipeline.

3
votes

The most likely issue is that you need to have Dataflow install the datalab pypi module.

Typically you would do this by listing "datalab" in the requirements.txt file you upload to Dataflow. See https://cloud.google.com/dataflow/pipelines/dependencies-python