I want to set up a pipeline every hour to parse 2000 raw protobuf format files in different folders of GCS buckets and load data into big query. so far I'm able to parse proto data successfully.
I know the wildcard method to read all the files in a folder, but I don't want that now as I have data from different folders and I want to run this faster like parallelism, not in a sequential way
like below
for x,filename enumerate(file_separted_comma):
--read data from prto
--load data to bigquery
Now I want to know whether the below approach is the best or recommended way of parsing multiple files from different folders in apache beam and load the data into a big query.
one more thing, Each record after parsing from proto, I'm making it into JSON record to load into the big query and don't know this is also a good way to load data to big query instead of directly loading deserialized(parsed) proto data.
I'm moving from a Hadoop job to dataflow to reduce the cost by setting up this pipeline.
I'm new to apache-beam,dont know what are cons&pros, hence can somebody take a look at the code and help me here to make a better approach to go for production
import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import base64
import rtbtracker_log_pb2
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
from apache_beam.io.filesystems import FileSystems
def get_deserialized_log(serialized_log):
log = rtbtracker_log_pb2.RtbTrackerLogProto()
log.ParseFromString(serialized_log)
return log
def print_row(message):
message=message[3]
message = message.replace('_', '/');
message = message.replace('*', '=');
message = message.replace('-', '+');
#finalbunary=base64.b64decode(message.decode('UTF-8'))
finalbunary=base64.b64decode(message)
msg=get_deserialized_log(finalbunary)
jsonObj = MessageToDict(msg)
#jsonObj = MessageToJson(msg)
return jsonObj
def parse_file(element):
for line in csv.reader([element], quotechar='"', delimiter='\t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
return line
def run():
parser = argparse.ArgumentParser()
parser.add_argument("--input", dest="input", required=False)
parser.add_argument("--output", dest="output", required=False)
app_args, pipeline_args = parser. parse_known_args()
with beam.Pipeline(options=PipelineOptions()) as p:
input_list=app_args.input
file_list = input_list.split(",")
res_list = ["/home/file_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]
for i,file in enumerate(file_list):
onesec=p | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file)
parsingProtoFile=onesec | 'Parse file{}'.format(i) >> beam.Map(parse_file)
printFileConetent=parsingProtoFile | 'Print output {}'.format(i) >>beam.Map(print_row)
#i want to load to bigquery here
##LOAD DATA TO BIGQUERY
#secondsec=printFileConetent | "Write TExt {}".format(i) >> ##beam.io.WriteToText("/home/file_{}".format(i),file_name_suffix=".json",
###num_shards=1 ,
##append_trailing_newlines = True)
if __name__ == '__main__':
run()
running code below in local
python3 another_main.py --input=tracker_one.gz,tracker_two.gz
output path i haven't mentioed as i dnt want to save the data to gcs as i will be loading it into bigquery
and like below running in dataflowrunner
python3 final_beam_v1.py --input gs://bucket/folder/2020/12/23/00/00/fileread.gz --output gs://bucket/beamoutput_four/ --runner DataflowRunner --project PROJECT --staging_location gs://bucket/staging_four --temp_location gs://bucket/temp_four --region us-east1 --setup_file ./setup.py --job_name testing
noticed that two jobs will be running for single input file in the same job name and dnt know why it is happening and PFA screenshot for the same