0
votes

I have been writing a Apache Beam/Dataflow pipeline for a usecase with 1M+ files stored in a GCS bucket; each file's path is also needed for the Bigquery row output. Each input file is a single line json file.

Here is my current pipeline snippet:

    input_file_path = 'gs://' + BUCKET + '/**'

    with beam.Pipeline(options=options) as p:
        (p | 'Reading input file' >> beam.io.ReadFromTextWithFilename(input_file_path)
         | 'Converting from json to dict' >> beam.ParDo(JSONtoDict())
         | 'Write entries into Bigquery' >> beam.io.WriteToBigQuery(
             table_spec,
             schema=table_schema,
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
         )


So I am using ReadFromTextWithFilename from the Beam Python SDK.

While testing, it works for ~1000 json files as expected. However, I am not sure it will work for >1M files in the large data set I plan on running this pipeline (via Google Dataflow).

In the Java SDK I came across withHintMatchesManyFiles. I can't find an equivalent in Python SDK, should I be switching to the Java SDK for this functionality?

Is there another way of getting the filename from which each string line comes that will be more scalable given the 1M+ input files?

2

2 Answers

1
votes

Not sure what your hesitation is. Could you explain what makes you think it will not work for larger number of files?

You can use MatchFiles to match and read multiple files.

  readable_files = (
      pipeline
      | fileio.MatchFiles('hdfs://path/to/*.txt')
      | fileio.ReadMatches()
      | beam.Reshuffle())
  files_and_contents = (
      readable_files
      | beam.Map(lambda x: (x.metadata.path, x.read_utf8())))

References:

  1. https://beam.apache.org/documentation/patterns/file-processing/
  2. https://beam.apache.org/releases/pydoc/2.12.0/apache_beam.io.fileio.html
0
votes

You can try the existing Python transforms. There's a known issue where Dataflow jobs might run into failures due to the source creating too many splits. Java withHintMatchesManyFiles. works around this by reading using a ParDo but it will disable dynamic work rebalancing. Dataflow Runner v2 supports large globs while supporting dynamic work rebalancing.