3
votes

I'm working on a pipeline that reads ~5 million files from a Google Cloud Storage (GCS) directory. I have it configured to run on Google Cloud Dataflow.

The problem is that when I start the pipeline, it takes hours "computing the size" of all of the files:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]

As you can see, it took an hour an a half (5549 seconds) to compute the size of about 5.5M files, then it started all over again from the beginning! It took another 2 hours to run the second pass, then it started it a third time! As of this time of writing, the job is still not available in the Dataflow console, which leads me to believe this is all happening on my local machine and not taking advantage of any distributed computing.

When I test the pipeline with a smaller input dataset (2 files) it repeats the size estimation 4 times:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.

At this rate it will take about 8 hours just to perform GCS size estimations of all 5.5M files 4 times, all before the Dataflow job has even started.

My pipeline is configured with the --runner=DataflowRunner option, so it should be running in Dataflow:

python bigquery_import.py --runner=DataflowRunner #other options...

The pipeline reads from GCS like this:

parser = argparse.ArgumentParser()
parser.add_argument(
    '--input',
    required=True,
    help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

with beam.Pipeline(options=pipeline_options) as p:
    files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')

Refer to bigquery_import.py on GitHub for the full code.

I'm confused why this tedious process is happening outside of the Dataflow environment and why it needs to be done multiple times. Am I reading the files from GCS correctly or is there a more efficient way?

1
FWIW I'm updating an old Dataflow Java SDK implementation which does not have this issue. Source code for reference: github.com/HTTPArchive/bigquery/blob/master/dataflow/java/src/… - Rick Viscomi

1 Answers

5
votes

Thanks for reporting this. Beam have two transforms for reading text. ReadFromText and ReadAllFromText. ReadFromText will run into this issue but ReadAllFromText shouldn't.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438

Downside of ReadAllFromText is that it won't perform dynamic work rebalancing, but this should not be an issue when reading a a large number of files.

Created https://issues.apache.org/jira/browse/BEAM-9620 for tracking issues with ReadFromText (and file-based sources in general).