1
votes

I am writing a dataflow pipeline that processes videos from a google cloud bucket. My pipeline downloads each work item to the local system and then reuploads results back to GCP bucket. Following previous question.

The pipeline works on local DirectRunner, i'm having trouble debugging on DataFlowRunnner.

The error reads

File "run_clouddataflow.py", line 41, in process 
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 464, in download_to_file self._do_download(transport, file_obj, download_url, headers) 
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 418, in _do_download download.consume(transport) File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 101, in consume self._write_to_stream(result) 
File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 62, in _write_to_stream with response: AttributeError: __exit__ [while running 'Run DeepMeerkat']

When trying to execute blob.download_to_file(file_obj) within:

storage_client=storage.Client()
bucket = storage_client.get_bucket(parsed.hostname)
blob=storage.Blob(parsed.path[1:],bucket)

#store local path
local_path="/tmp/" + parsed.path.split("/")[-1]

print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
  blob.download_to_file(file_obj)

print("Downloaded" + local_path)

I'm guessing that the workers are not in permission to write locally? Or perhaps there is not a /tmp folder in the dataflow container. Where should I write objects? Its hard to debug without access to the environment. Is it possible to access stdout from workers for debugging purposes (serial console?)

EDIT #1

I've tried explicitly passing credentials:

  try:
      credentials, project = google.auth.default()
  except:
      os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken
      credentials, project = google.auth.default()

as well as writing to cwd(), instead of /tmp/

local_path=parsed.path.split("/")[-1]

print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
  blob.download_to_file(file_obj)

Still getting the cryptic error on blob downloads from gcp.

Full Pipeline script is below, setup.py is here.

import logging
import argparse
import json
import logging
import os
import csv
import apache_beam as beam
from urlparse import urlparse
from google.cloud import storage

##The namespaces inside of clouddataflow workers is not inherited ,
##Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors, better to write ugly import statements then to miss a namespace

class PredictDoFn(beam.DoFn):
  def process(self,element):

    import csv
    from google.cloud import storage
    from DeepMeerkat import DeepMeerkat
    from urlparse import urlparse
    import os
    import google.auth


    DM=DeepMeerkat.DeepMeerkat()

    print(os.getcwd())
    print(element)

    #try adding credentials?
    #set credentials, inherent from worker
    credentials, project = google.auth.default()

    #download element locally
    parsed = urlparse(element[0])

    #parse gcp path
    storage_client=storage.Client(credentials=credentials)
    bucket = storage_client.get_bucket(parsed.hostname)
    blob=storage.Blob(parsed.path[1:],bucket)

    #store local path
    local_path=parsed.path.split("/")[-1]

    print('local path: ' + local_path)
    with open(local_path, 'wb') as file_obj:
      blob.download_to_file(file_obj)

    print("Downloaded" + local_path)

    #Assign input from DataFlow/manifest
    DM.process_args(video=local_path)
    DM.args.output="Frames"

    #Run DeepMeerkat
    DM.run()

    #upload back to GCS
    found_frames=[]
    for (root, dirs, files) in os.walk("Frames/"):
      for files in files:
        fileupper=files.upper()
        if fileupper.endswith((".JPG")):
          found_frames.append(os.path.join(root, files))

    for frame in found_frames:

      #create GCS path
      path="DeepMeerkat/" + parsed.path.split("/")[-1] + "/" + frame.split("/")[-1]
      blob=storage.Blob(path,bucket)
      blob.upload_from_filename(frame)

def run():
  import argparse
  import os
  import apache_beam as beam
  import csv
  import logging
  import google.auth

  parser = argparse.ArgumentParser()
  parser.add_argument('--input', dest='input', default="gs://api-project-773889352370-testing/DataFlow/manifest.csv",
                      help='Input file to process.')
  parser.add_argument('--authtoken', default="/Users/Ben/Dropbox/Google/MeerkatReader-9fbf10d1e30c.json",
                      help='Input file to process.')
  known_args, pipeline_args = parser.parse_known_args()

  #set credentials, inherent from worker
  try:
      credentials, project = google.auth.default()
  except:
      os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken
      credentials, project = google.auth.default()

  p = beam.Pipeline(argv=pipeline_args)

  vids = (p|'Read input' >> beam.io.ReadFromText(known_args.input)
       | 'Parse input' >> beam.Map(lambda line: csv.reader([line]).next())
       | 'Run DeepMeerkat' >> beam.ParDo(PredictDoFn()))

  logging.getLogger().setLevel(logging.INFO)
  p.run()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
1
You can log and read the log data on the cloud UI. Would that suffice? Also, you should be able to write to local disk. I'll get back to you on that.Pablo
Thanks Pablo. I'm checking out the new google.auth module, it also might be that the worker is not inheriting my credentials from dataflow. I just added try: credentials, project = google.auth.default() except: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken credentials, project = google.auth.default()bw4sz
Added to edits.bw4sz
Updated script to reflect edits.bw4sz

1 Answers

1
votes

I spoke to the google-cloud-storage package mantainer, this was a known issue. Updating specific versiosn in my setup.py to

REQUIRED_PACKAGES = ["google-cloud-storage==1.3.2","google-auth","requests>=2.18.0"]

fixed the issue.

https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3836