I'm working on a pipeline that reads ~5 million files from a Google Cloud Storage (GCS) directory. I have it configured to run on Google Cloud Dataflow.
The problem is that when I start the pipeline, it takes hours "computing the size" of all of the files:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
As you can see, it took an hour an a half (5549 seconds) to compute the size of about 5.5M files, then it started all over again from the beginning! It took another 2 hours to run the second pass, then it started it a third time! As of this time of writing, the job is still not available in the Dataflow console, which leads me to believe this is all happening on my local machine and not taking advantage of any distributed computing.
When I test the pipeline with a smaller input dataset (2 files) it repeats the size estimation 4 times:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.
At this rate it will take about 8 hours just to perform GCS size estimations of all 5.5M files 4 times, all before the Dataflow job has even started.
My pipeline is configured with the --runner=DataflowRunner option, so it should be running in Dataflow:
python bigquery_import.py --runner=DataflowRunner #other options...
The pipeline reads from GCS like this:
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
required=True,
help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')
Refer to bigquery_import.py on GitHub for the full code.
I'm confused why this tedious process is happening outside of the Dataflow environment and why it needs to be done multiple times. Am I reading the files from GCS correctly or is there a more efficient way?