0
votes

Let me preface everything by first stating I am just getting to know my way around working with Beam's Python SDK and GCP Dataflow!

The issue: My pipeline has been working great for almost all use cases. No errors that I can complain about. I just have some questions about some possible bottlenecks or optimizations I could make. I have noticed that my pipeline execution times jump to almost over 3 hours when working with a gzipped file of size ~50mb. Not entirely sure if there is any way to speed up that part. Below is a screenshot of the log warnings I see a bunch before the job eventually successfully completes.

Log Output details from Dataflow

Here is a relevant snippet of the pipeline:

if __name__ == '__main__':

    parser = argparse.ArgumentParser(
        description='Beam Pipeline: example'
    )
    parser.add_argument('--input-glob',
                        help='Cloud Path To Raw Data',
                        required=True)
    parser.add_argument('--output-bq-table',
                        help='BQ Native {dataset}.{table}')
    known_args, pipeline_args = parser.parse_known_args()

    with beam.Pipeline(argv=pipeline_args) as p:

        json_raw = (p | 'READING RAW DATA' >> beam.io.ReadFromText(known_args.input_glob)
            | 'JSON-ing' >> beam.Map(lambda e: json.loads(e))
        )

Extra info:

  1. I am using Airflow's Dataflowhook to launch this.
  2. I've played around with different machine types hoping that throwing more compute power at the problem would make it go away, but so far no luck.

  3. The following pipeline execution params:

--runner=DataflowRunner \
--project=example-project \
--region=us-east4 \
--save_main_session \
--temp_location=gs://example-bucket/temp/ \
--input-glob=gs://example-bucket/raw/  \
--machine_type=n1-standard-16 \
--job_name=this-is-a-test-insert
1

1 Answers

1
votes

Beam has numerous optimizations that allow it to split its processing of files into multiple workers. Furthermore, it is able to split a single file to be consumed by multiple workers in parallel.

Unfortunately, this is not possible with gzip files. This is because gzip files are compressed into a single block that has to be decompressed serially. When the Beam worker reads this file, it has to read the whole thing serially.

There are some compression formats that allow you to read them in parallel (these are usually 'multiblock' formats). Unfortunately, I believe the Beam Python SDK only supports serial formats at the moment.

If you need to have your pipeline work this way, you could try adding a beam.Reshuffle operation after ReadFromText. By doing this, your pipeline will still read the file serially, but apply all downstream operations in parallel (see this Performance section in the PTransform guide to see why this is the case).

Some other alternatives:

  • Separate your data into multiple gzip files.
  • Decompress it before consuming it in the pipeline.
  • (half joking/half serious) Contribute a multiblock compression support for Beam? : )) :))