I am writing a dataflow pipeline that processes videos from a google cloud bucket. My pipeline downloads each work item to the local system and then reuploads results back to GCP bucket. Following previous question.
The pipeline works on local DirectRunner, i'm having trouble debugging on DataFlowRunnner.
The error reads
File "run_clouddataflow.py", line 41, in process
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 464, in download_to_file self._do_download(transport, file_obj, download_url, headers)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 418, in _do_download download.consume(transport) File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 101, in consume self._write_to_stream(result)
File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 62, in _write_to_stream with response: AttributeError: __exit__ [while running 'Run DeepMeerkat']
When trying to execute blob.download_to_file(file_obj) within:
storage_client=storage.Client()
bucket = storage_client.get_bucket(parsed.hostname)
blob=storage.Blob(parsed.path[1:],bucket)
#store local path
local_path="/tmp/" + parsed.path.split("/")[-1]
print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
blob.download_to_file(file_obj)
print("Downloaded" + local_path)
I'm guessing that the workers are not in permission to write locally? Or perhaps there is not a /tmp folder in the dataflow container. Where should I write objects? Its hard to debug without access to the environment. Is it possible to access stdout from workers for debugging purposes (serial console?)
EDIT #1
I've tried explicitly passing credentials:
try:
credentials, project = google.auth.default()
except:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken
credentials, project = google.auth.default()
as well as writing to cwd(), instead of /tmp/
local_path=parsed.path.split("/")[-1]
print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
blob.download_to_file(file_obj)
Still getting the cryptic error on blob downloads from gcp.
Full Pipeline script is below, setup.py is here.
import logging
import argparse
import json
import logging
import os
import csv
import apache_beam as beam
from urlparse import urlparse
from google.cloud import storage
##The namespaces inside of clouddataflow workers is not inherited ,
##Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors, better to write ugly import statements then to miss a namespace
class PredictDoFn(beam.DoFn):
def process(self,element):
import csv
from google.cloud import storage
from DeepMeerkat import DeepMeerkat
from urlparse import urlparse
import os
import google.auth
DM=DeepMeerkat.DeepMeerkat()
print(os.getcwd())
print(element)
#try adding credentials?
#set credentials, inherent from worker
credentials, project = google.auth.default()
#download element locally
parsed = urlparse(element[0])
#parse gcp path
storage_client=storage.Client(credentials=credentials)
bucket = storage_client.get_bucket(parsed.hostname)
blob=storage.Blob(parsed.path[1:],bucket)
#store local path
local_path=parsed.path.split("/")[-1]
print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
blob.download_to_file(file_obj)
print("Downloaded" + local_path)
#Assign input from DataFlow/manifest
DM.process_args(video=local_path)
DM.args.output="Frames"
#Run DeepMeerkat
DM.run()
#upload back to GCS
found_frames=[]
for (root, dirs, files) in os.walk("Frames/"):
for files in files:
fileupper=files.upper()
if fileupper.endswith((".JPG")):
found_frames.append(os.path.join(root, files))
for frame in found_frames:
#create GCS path
path="DeepMeerkat/" + parsed.path.split("/")[-1] + "/" + frame.split("/")[-1]
blob=storage.Blob(path,bucket)
blob.upload_from_filename(frame)
def run():
import argparse
import os
import apache_beam as beam
import csv
import logging
import google.auth
parser = argparse.ArgumentParser()
parser.add_argument('--input', dest='input', default="gs://api-project-773889352370-testing/DataFlow/manifest.csv",
help='Input file to process.')
parser.add_argument('--authtoken', default="/Users/Ben/Dropbox/Google/MeerkatReader-9fbf10d1e30c.json",
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args()
#set credentials, inherent from worker
try:
credentials, project = google.auth.default()
except:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken
credentials, project = google.auth.default()
p = beam.Pipeline(argv=pipeline_args)
vids = (p|'Read input' >> beam.io.ReadFromText(known_args.input)
| 'Parse input' >> beam.Map(lambda line: csv.reader([line]).next())
| 'Run DeepMeerkat' >> beam.ParDo(PredictDoFn()))
logging.getLogger().setLevel(logging.INFO)
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()