0
votes

I am creating a data pipeline using Apache beam to take XML file as input from the google storage bucket (GCS) and converting it into JSON file. I am trying to use the 'xmltodict' library of python to first convert XML into python dict after which I use the python json.dumps() function to convert it into json format. I have created separate beam.DoFn classes for each step for the beam pipeline.

I tested the pipeline on a small file (less than 1MB size) and it worked. The code runs both on directrunner (less than 1 min) and dataflow runner (5-6 mins including the startting and stopping of dataflow job). But when I use the dataflow runner with a bigger file, like (~150 MB), the pipeline keeps on running for close to 1 hour without any progress. I am not to figure out what is wrong.

I think taking the whole file as input in one string will be the issue and if I can somehow read XML file from GCS in a better way so as to parse each record as single record, will solve this issue. I will appreciate any help in making this better.

Below are sample codes:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions

class ReadGCSfile(beam.DoFn):
    def process(self,element):      
        from apache_beam.io.gcp import gcsio
        gcs = gcsio.GcsIO()
        yield(gcs.open(element).read())

# this class converts hyphens(-) in the XML text into (_) so that column name convention is inline with #the BigQuery conventions

class clean_xml(beam.DoFn):
    def process(self,element):
        if re.search('(<.[^\>]*)-', element) == None:
            return[element]
        else:
            for i in range(len(re.findall('(<.[^\>]*)-', element))):
                y = re.search('(<.[^\>]*)-', element)
                splitpoint1 = y.span()[0]
                splitpoint2 = y.span()[1]
                element = element[:splitpoint1] + element[splitpoint1:splitpoint2].replace('-','_') + element[splitpoint2:]
            return[(element)]

class createdict(beam.DoFn):
    def process(self,element):
        import xmltodict
        order_data = xmltodict.parse(element)
        unnested_data = order_data['root1']['root2']
        return[(unnested_data)]

class converttojson(beam.DoFn):
    def process(self,element):
        import json
        import re
        for order in element:
            order_j = json.dumps(order)
            yield(order_j)

def run(argv = None):

    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)  
    google_cloud_options.project = 'project123'
    google_cloud_options.job_name = 'job123'
    google_cloud_options.staging_location = 'gs://bucket123/staging'
    google_cloud_options.temp_location = 'gs://bucket123/temp'
    google_cloud_options.machine_type = 'n1-standard-8'
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    p = beam.Pipeline(options=options)
    input_file_path = 'gs://' + input_bucket +'/'+input_file

    (p
        | 'Create' >> beam.Create([input_file_path])
        | 'GetXML' >> beam.ParDo(ReadGCSfile())
        | 'Clean_XML' >> beam.ParDo(clean_xml())
        | 'CreateDict' >> beam.ParDo(createdict())
        | 'Convert2JSON' >> beam.ParDo(converttojson())
        | 'write' >> beam.io.WriteToText('gs://bk/output',file_name_suffix='.json',num_shards =1,shard_name_template='')
    )
    p.run()

WHat improvements can I make in this to make it more efficient for bigger files. The pipeline right now seems to be stuck at GetXML and Clean_XML stage. How can I iterate through the XML file one record at a time?

Below is a sample data file :

<?xml version="1.0" encoding="UTF-8"?>
<root1 xmlns="http://www.example.com">
    <root2 ID-no="000000">
        <date>2022-09-23T06:58:24.000Z</date>
        <created-by>storefront</created-by>
        <original-order-no>000000</original-order-no>
        <currency>USD</currency>
        <invoice-no>11111111</invoice-no>
        <customer>
            <customer-name>abcccccc</customer-name>
            <customer-email>[email protected]</customer-email>
            <billing-address>
                <address1>20 xyz</address1>
                <city>mars</city>
                <postal-code>123456</postal-code>
                <state-code>hhjbjh</state-code>
                <country-code>nm mn</country-code>
            </billing-address>
        </customer>
        <status>
            <order-status>NEW</order-status>
            <shipping-status>NOT_SHIPPED</shipping-status>
            <confirmation-status>CONFIRMED</confirmation-status>
            <payment-status>NOT_PAID</payment-status>
        </status>
    </root2>
</root1>
1
If it is a single large file. I suspect you don't have any parallelism here: there is only one thread/worker processing the data, thus slow. And in such a case, worker are more likely to get throttled making the whole pipeline taking even longer time.Ruoyun Huang
Is there any way around for this?learning cloud
ask this question first, is there a way to split this single file into multiple ones, then process them in parallel? If the answer is no, then I don't think there is an easy way to get around.Ruoyun Huang

1 Answers

1
votes

Most likely it is due to lack of parallelism to finish efficiently. If we have doubt that it is due to job not running instead, I would suggest add counters to track the progress (e.g. number of words processed so far etc).

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/counters.py