3
votes

I have to write a Dataflow job in python, which will read two different .csv files from GCS, perform a join operation, perform a transformation on result of the joined dataframe, then finally send it to BigQuery table?

I am very new to this, I got to know that we can do all pipeline operation from apache.beam after lots of R&D. I finally found a template, but still have lots of confusion on the given point.

import logging
import os

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.pipeline import PipelineOptions


os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='auth_file.json'


class DataTransformation:
    """A helper class that translates a CSV into a format BigQuery will accept."""

     def __init__(self):
         dir_path = os.path.dirname(os.path.realpath(__file__))
         # Here we read the output schema from a json file.  This is used to specify the types
         # of data we are writing to BigQuery.
         self.schema = os.path.join(dir_path, 'resources',
                                    'gs://wahtch_dog_dataflow/schema.json')
        
     # Parse the input csv and convert into a BigQuery-savable dictionary.
     def read_all_from_url(beam.DoFn):
           with FileSystems.open(url) as f:
            return f.read()
    

def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to read. This can be a local file or '
        'a file in a Google Storage Bucket.',
        default = 'gs://wahtch_dog_dataflow/demo.csv')

    parser.add_argument('--output',
                        dest='output',
                        required=False,
                        help='Output BQ table to write results to.',
                        default='watchdog_output.transformed')

    # Parse arguments from the command line.
    known_args, pipeline_args = parser.parse_known_args(argv)

    # DataIngestion is a class we built in this script to hold the logic for
    # transforming the file into a BigQuery table.
    data_ingestion = DataTransformation()
    url = "gs://smart-ivr-dl-pushed-data"
    # Initiate the pipeline using the pipeline arguments passed in from the
    # command line. This includes information such as the project ID and
    # where Dataflow should store temp files.
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))

    (
     p | beam.Create(urls)
       |'Reading latest file' >> beam.ParDo(read_all_from_url())
    
     # This stage of the pipeline translates from a CSV file single row
     # input as a string, to a dictionary object consumable by BigQuery.
     # It refers to a function we have written. This function will
     # be run in parallel on different workers using input from the
     # previous stage of the pipeline.
     | 'String To BigQuery Row' >>
         beam.Map(lambda s: data_ingestion.parse_method(s))
     | 'Write to BigQuery' >> beam.io.Write(
         beam.io.BigQuerySink(
             # The table name is a required argument for the BigQuery sink.
             # In this case we use the value passed in from the command line.
             known_args.output,
             # Here we use the simplest way of defining a schema:
             # fieldName:fieldType

             ###### schema of the ivr
             schema=schema ,

             # Creates the table in BigQuery if it does not yet exist.
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             # Deletes all data in the BigQuery table before writing.
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
    p.run().wait_until_finish()

        
if __name__ == '__main__':
     logging.getLogger().setLevel(logging.INFO)
     DataTransformation.run()

Questions I have:

  1. read_all_from_url: How to read multiple files here, what is url here? Is it the name of a bucket, or a storage path?

  2. how to read schema from bucket, (above I found it from somewhere that we can read like that, but i doubt if it can read schema like above)

  3. How to do a transformation? Like I want to perform groupby etc.

Update:

. i am able to read two file from directory. but each file act as pcollection. Let me telll you my logical step 1) Read Two file from local directory. 2) join both datafame using join operation: I am stuck here 3) perform some transformation in this join dataframe.

class ReadOrc(beam.DoFn):
      def process(self, element):
           df = pd.read_csv(element) 
           yield df
        

csv_lines = (p | beam.Create(urls) |
            'Reading latest file' >> beam.ParDo(ReadOrc()) 
          | 'transform' >> beam.ParDo(transform()))

above code read the 2 file from directory and has value like (df1, df2) in p colection

Now In transform i want join both the dataframe and do preprocessing step.

2

2 Answers

1
votes

I've removed all the extra setup and configuration, and I'm sharing a small pipeline that should - more or less do what you need.

But consider that BigQuery should be able to import a single CSV file by itself without using a Dataflow job. Would this be more helpful?


If you would still like to import to BQ using Dataflow, this pipeline should - more or less - do the trick:

What I recommend given your input is for you to try the following:

import logging
import io

import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.pipeline import PipelineOptions

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(
 p 
 | beam.Create(urls)
 | 'Finding latest file' >> fileio.MatchAll()
 | 'Get file handlers' >> fileio.ReadMatches()
 | 'Read each file handler' >> beam.FlatMap(
       lambda rf: csv.reader(io.TextIOWrapper(rf.open())))
 | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
         known_args.output,
         schema=schema ,
         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
         write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))

p.run().wait_until_finish()

If you're reading in Dataframes from CSV, you can do yield df.iterrows() - this will break down the Dataframe into individual rows - and then you can join them.

0
votes

You may also want to consider the Beam Dataframes API.