3
votes

I want to set up a pipeline every hour to parse 2000 raw protobuf format files in different folders of GCS buckets and load data into big query. so far I'm able to parse proto data successfully.

I know the wildcard method to read all the files in a folder, but I don't want that now as I have data from different folders and I want to run this faster like parallelism, not in a sequential way

like below

for x,filename enumerate(file_separted_comma):
    --read data from prto
    --load data to bigquery 

Now I want to know whether the below approach is the best or recommended way of parsing multiple files from different folders in apache beam and load the data into a big query.

one more thing, Each record after parsing from proto, I'm making it into JSON record to load into the big query and don't know this is also a good way to load data to big query instead of directly loading deserialized(parsed) proto data.

I'm moving from a Hadoop job to dataflow to reduce the cost by setting up this pipeline.

I'm new to apache-beam,dont know what are cons&pros, hence can somebody take a look at the code and help me here to make a better approach to go for production

import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import base64
import rtbtracker_log_pb2
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
from apache_beam.io.filesystems import FileSystems


def get_deserialized_log(serialized_log):
    log = rtbtracker_log_pb2.RtbTrackerLogProto()
    log.ParseFromString(serialized_log)
    return log


def print_row(message):
    message=message[3]
    message = message.replace('_', '/');
    message = message.replace('*', '=');
    message = message.replace('-', '+');
    #finalbunary=base64.b64decode(message.decode('UTF-8'))
    finalbunary=base64.b64decode(message)
    msg=get_deserialized_log(finalbunary)

    jsonObj = MessageToDict(msg)
    #jsonObj = MessageToJson(msg)
    return jsonObj

def parse_file(element):
  for line in csv.reader([element], quotechar='"', delimiter='\t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
    return line



def run():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", dest="input", required=False)
    parser.add_argument("--output", dest="output", required=False)
    app_args, pipeline_args = parser. parse_known_args()

    with beam.Pipeline(options=PipelineOptions()) as p:
        input_list=app_args.input
        file_list = input_list.split(",")
        res_list = ["/home/file_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]

        for i,file in enumerate(file_list):
            onesec=p | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file)
            parsingProtoFile=onesec | 'Parse file{}'.format(i) >> beam.Map(parse_file)
            printFileConetent=parsingProtoFile | 'Print output {}'.format(i) >>beam.Map(print_row)
        
            #i want to load to bigquery here
            ##LOAD DATA TO BIGQUERY

            #secondsec=printFileConetent | "Write TExt {}".format(i) >> ##beam.io.WriteToText("/home/file_{}".format(i),file_name_suffix=".json", 
###num_shards=1 , 
##append_trailing_newlines = True)
        

if __name__ == '__main__':
    run()

running code below in local

python3 another_main.py --input=tracker_one.gz,tracker_two.gz

output path i haven't mentioed as i dnt want to save the data to gcs as i will be loading it into bigquery

and like below running in dataflowrunner

python3 final_beam_v1.py --input gs://bucket/folder/2020/12/23/00/00/fileread.gz --output gs://bucket/beamoutput_four/ --runner DataflowRunner --project PROJECT --staging_location gs://bucket/staging_four --temp_location gs://bucket/temp_four --region us-east1 --setup_file ./setup.py --job_name testing

noticed that two jobs will be running for single input file in the same job name and dnt know why it is happening and PFA screenshot for the same enter image description here

1

1 Answers

2
votes

That method of reading files is fine (as long as the number of input files isn't too large). However, if you can express the set of files you want to read as a wildcard expression (which can match against multiple folders), that will likely perform better, and Dataflow will read all the files that match the pattern in parallel.

For writing to BigQuery, it's best to use the built-in BigQuery sink. The default behavior is to create temp files in JSON format and then load those into BigQuery, but you can also use Avro instead, which can be more efficient. You can also combine all of your inputs into one PCollection using Flatten, so that you only need one BigQuery sink in your pipeline.