I have to write a Dataflow job in python, which will read two different .csv files from GCS, perform a join operation, perform a transformation on result of the joined dataframe, then finally send it to BigQuery table?
I am very new to this, I got to know that we can do all pipeline operation from apache.beam after lots of R&D. I finally found a template, but still have lots of confusion on the given point.
import logging
import os
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.pipeline import PipelineOptions
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='auth_file.json'
class DataTransformation:
"""A helper class that translates a CSV into a format BigQuery will accept."""
def __init__(self):
dir_path = os.path.dirname(os.path.realpath(__file__))
# Here we read the output schema from a json file. This is used to specify the types
# of data we are writing to BigQuery.
self.schema = os.path.join(dir_path, 'resources',
'gs://wahtch_dog_dataflow/schema.json')
# Parse the input csv and convert into a BigQuery-savable dictionary.
def read_all_from_url(beam.DoFn):
with FileSystems.open(url) as f:
return f.read()
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
default = 'gs://wahtch_dog_dataflow/demo.csv')
parser.add_argument('--output',
dest='output',
required=False,
help='Output BQ table to write results to.',
default='watchdog_output.transformed')
# Parse arguments from the command line.
known_args, pipeline_args = parser.parse_known_args(argv)
# DataIngestion is a class we built in this script to hold the logic for
# transforming the file into a BigQuery table.
data_ingestion = DataTransformation()
url = "gs://smart-ivr-dl-pushed-data"
# Initiate the pipeline using the pipeline arguments passed in from the
# command line. This includes information such as the project ID and
# where Dataflow should store temp files.
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(
p | beam.Create(urls)
|'Reading latest file' >> beam.ParDo(read_all_from_url())
# This stage of the pipeline translates from a CSV file single row
# input as a string, to a dictionary object consumable by BigQuery.
# It refers to a function we have written. This function will
# be run in parallel on different workers using input from the
# previous stage of the pipeline.
| 'String To BigQuery Row' >>
beam.Map(lambda s: data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
known_args.output,
# Here we use the simplest way of defining a schema:
# fieldName:fieldType
###### schema of the ivr
schema=schema ,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
DataTransformation.run()
Questions I have:
read_all_from_url: How to read multiple files here, what is url here? Is it the name of a bucket, or a storage path?
how to read schema from bucket, (above I found it from somewhere that we can read like that, but i doubt if it can read schema like above)
How to do a transformation? Like I want to perform groupby etc.
Update:
. i am able to read two file from directory. but each file act as pcollection. Let me telll you my logical step 1) Read Two file from local directory. 2) join both datafame using join operation: I am stuck here 3) perform some transformation in this join dataframe.
class ReadOrc(beam.DoFn):
def process(self, element):
df = pd.read_csv(element)
yield df
csv_lines = (p | beam.Create(urls) |
'Reading latest file' >> beam.ParDo(ReadOrc())
| 'transform' >> beam.ParDo(transform()))
above code read the 2 file from directory and has value like (df1, df2) in p colection
Now In transform i want join both the dataframe and do preprocessing step.