I'm trying to download large files from http and upload those to gcs using apache-beam python sdk (2.6.0) on dataflow.
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
gcs = GCSFileSystem(options)
logging.info("Writing to file {}".format(filename))
f = gcs.create(filename, compression_type=CompressionTypes.UNCOMPRESSED)
chunk_size = 512 * 1024
for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
if chunk:
f.write(chunk)
else:
break
logging.info("Closing file {}".format(filename))
f.close()
logging.info("Closed file {}".format(filename))
This approach works well for small files (~kb), though I'm struggling making it work for larger size files (~Gb).
Logging indicates that it gets stuck within f.close()
, and no file has yet been written into GCS.
I've dug into the code and it seems that GCSFileSystem instantiates a GcsBufferedWriter
which itself writes into a multiprocessing Pipe that gets ingested by transfer.Upload
.
I don't have much clues on what could cause this issue, I'm suspecting connections/pipes being reset or silently broken in the process (the http server I'm downloading from has very low throughput and I'm parallelizing calls with gevent
), or simply transfer.Upload having some performance issues.
When I'm checking machine stats, I have much more incoming (20Mo/s) than out traffic (200ko/s), basically no disk writes, which makes me wonder where goes all piped data..