I am creating a data pipeline using Apache beam to take XML file as input from the google storage bucket (GCS) and converting it into JSON file. I am trying to use the 'xmltodict' library of python to first convert XML into python dict after which I use the python json.dumps() function to convert it into json format. I have created separate beam.DoFn classes for each step for the beam pipeline.
I tested the pipeline on a small file (less than 1MB size) and it worked. The code runs both on directrunner (less than 1 min) and dataflow runner (5-6 mins including the startting and stopping of dataflow job). But when I use the dataflow runner with a bigger file, like (~150 MB), the pipeline keeps on running for close to 1 hour without any progress. I am not to figure out what is wrong.
I think taking the whole file as input in one string will be the issue and if I can somehow read XML file from GCS in a better way so as to parse each record as single record, will solve this issue. I will appreciate any help in making this better.
Below are sample codes:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
class ReadGCSfile(beam.DoFn):
def process(self,element):
from apache_beam.io.gcp import gcsio
gcs = gcsio.GcsIO()
yield(gcs.open(element).read())
# this class converts hyphens(-) in the XML text into (_) so that column name convention is inline with #the BigQuery conventions
class clean_xml(beam.DoFn):
def process(self,element):
if re.search('(<.[^\>]*)-', element) == None:
return[element]
else:
for i in range(len(re.findall('(<.[^\>]*)-', element))):
y = re.search('(<.[^\>]*)-', element)
splitpoint1 = y.span()[0]
splitpoint2 = y.span()[1]
element = element[:splitpoint1] + element[splitpoint1:splitpoint2].replace('-','_') + element[splitpoint2:]
return[(element)]
class createdict(beam.DoFn):
def process(self,element):
import xmltodict
order_data = xmltodict.parse(element)
unnested_data = order_data['root1']['root2']
return[(unnested_data)]
class converttojson(beam.DoFn):
def process(self,element):
import json
import re
for order in element:
order_j = json.dumps(order)
yield(order_j)
def run(argv = None):
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'project123'
google_cloud_options.job_name = 'job123'
google_cloud_options.staging_location = 'gs://bucket123/staging'
google_cloud_options.temp_location = 'gs://bucket123/temp'
google_cloud_options.machine_type = 'n1-standard-8'
options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=options)
input_file_path = 'gs://' + input_bucket +'/'+input_file
(p
| 'Create' >> beam.Create([input_file_path])
| 'GetXML' >> beam.ParDo(ReadGCSfile())
| 'Clean_XML' >> beam.ParDo(clean_xml())
| 'CreateDict' >> beam.ParDo(createdict())
| 'Convert2JSON' >> beam.ParDo(converttojson())
| 'write' >> beam.io.WriteToText('gs://bk/output',file_name_suffix='.json',num_shards =1,shard_name_template='')
)
p.run()
WHat improvements can I make in this to make it more efficient for bigger files. The pipeline right now seems to be stuck at GetXML and Clean_XML stage. How can I iterate through the XML file one record at a time?
Below is a sample data file :
<?xml version="1.0" encoding="UTF-8"?>
<root1 xmlns="http://www.example.com">
<root2 ID-no="000000">
<date>2022-09-23T06:58:24.000Z</date>
<created-by>storefront</created-by>
<original-order-no>000000</original-order-no>
<currency>USD</currency>
<invoice-no>11111111</invoice-no>
<customer>
<customer-name>abcccccc</customer-name>
<customer-email>[email protected]</customer-email>
<billing-address>
<address1>20 xyz</address1>
<city>mars</city>
<postal-code>123456</postal-code>
<state-code>hhjbjh</state-code>
<country-code>nm mn</country-code>
</billing-address>
</customer>
<status>
<order-status>NEW</order-status>
<shipping-status>NOT_SHIPPED</shipping-status>
<confirmation-status>CONFIRMED</confirmation-status>
<payment-status>NOT_PAID</payment-status>
</status>
</root2>
</root1>