0
votes

I'm trying to download large files from http and upload those to gcs using apache-beam python sdk (2.6.0) on dataflow.

from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

gcs = GCSFileSystem(options)
logging.info("Writing to file {}".format(filename))
f = gcs.create(filename, compression_type=CompressionTypes.UNCOMPRESSED)
chunk_size = 512 * 1024
for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
    if chunk:
        f.write(chunk)
    else:
        break
    logging.info("Closing file {}".format(filename))
    f.close()
    logging.info("Closed file {}".format(filename))

This approach works well for small files (~kb), though I'm struggling making it work for larger size files (~Gb).

Logging indicates that it gets stuck within f.close(), and no file has yet been written into GCS. I've dug into the code and it seems that GCSFileSystem instantiates a GcsBufferedWriter which itself writes into a multiprocessing Pipe that gets ingested by transfer.Upload.

I don't have much clues on what could cause this issue, I'm suspecting connections/pipes being reset or silently broken in the process (the http server I'm downloading from has very low throughput and I'm parallelizing calls with gevent), or simply transfer.Upload having some performance issues.

When I'm checking machine stats, I have much more incoming (20Mo/s) than out traffic (200ko/s), basically no disk writes, which makes me wonder where goes all piped data..

1

1 Answers

0
votes

In the meantime I've switch to using google storage python client which indeed seems to have better performance and does work as expected. Though I'm observing some transient errors

File "dataflow/make_training_chips.py", line 93, in process
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/client.py", line 71, in __init__
_http=_http)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 215, in __init__
_ClientProjectMixin.__init__(self, project=project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 169, in __init__
project = self._determine_default(project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/client.py", line 182, in _determine_default
return _determine_default_project(project)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/_helpers.py", line 179, in _determine_default_project
_, project = google.auth.default()
File "/usr/local/lib/python2.7/dist-packages/google/auth/_default.py", line 306, in default
  raise exceptions.DefaultCredentialsError(_HELP_MESSAGE)
  DefaultCredentialsError: Could not automatically determine credentials. 
  Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials 
  and re-run the application. For more information, please see 
  https://developers.google.com/accounts/docs/application-default-credentials. 
  [while running 'ParDo(GenerateTraingChips)']

which happen quite often in my workers (though I retry task with exponential backoff...).