3
votes

I have an application that parallelizes the execution of Python objects that process data to be downloaded from Google Storage (my project bucket). The cluster is created using Google Dataproc. The problem is that the data is never downloaded! I wrote a test program to try and understand the problem. I wrote the following function to copy the files from the bucket and to see if creating files on workers does work:

from subprocess import call
from os.path import join

def copyDataFromBucket(filename,remoteFolder,localFolder):
  call(["gsutil","-m","cp",join(remoteFolder,filename),localFolder]

def execTouch(filename,localFolder):
  call(["touch",join(localFolder,"touched_"+filename)])

I've tested this function by calling it from a python shell and it works. But when I run the following code using spark-submit, the files are not downloaded (but no error is raised):

# ...
filesRDD = sc.parallelize(fileList)
filesRDD.foreach(lambda myFile: copyDataFromBucket(myFile,remoteBucketFolder,'/tmp/output')
filesRDD.foreach(lambda myFile: execTouch(myFile,'/tmp/output')
# ...

The execTouch function works (I can see the files on each worker) but the copyDataFromBucket function does nothing.

So what am I doing wrong?

1
One precision: I use the Anaconda2 package to run my application but I had to set the CLOUDSDK_PYTHON variable to /usr/bin/python for gsutil to workma3oun
if you were to run gsutil -m cp ... with bash or in your shell, does that currently work?Kristian
Yes it works fine, both on the master and on each of the workers.ma3oun

1 Answers

2
votes

The problem was clearly the Spark context. Replacing the call to "gsutil" by a call to "hadoop fs" solves it:

from subprocess import call
from os.path import join

def copyDataFromBucket(filename,remoteFolder,localFolder):
  call(["hadoop","fs","-copyToLocal",join(remoteFolder,filename),localFolder]

I also did a test to send data to the bucket. One only needs to replace "-copyToLocal" by "-copyFromLocal"