0
votes

I am currently using Dataflow to handle/massage an XML text string from PubSub. I was able to successfully run a Dataflow job using DirectRunner for my --runner flag. However, I ran into an issue trying create a Dataflow resource using the exact same Dataflow job with DataflowRunner as my flag.

From the error logs (while using DataflowRunner), it seems that the Dataflow template that I've created does not recognize:

import xml.etree.ElementTree as ET

I receive "NameError: name 'ET' is not defined [while running 'generatedPtransform-419']" whenever I reference ET within my pipeline. What is peculiar is that my Dataflow job runs perfectly fine with the DirectRunner which leads me to believe there is an issue with using DataflowRunner to build my template as xml.etree.ElementTree is a simple/native PyPI library.

For my environment, I am working with:
Python 3.7.7
apache-beam 2.22.0

Any help/guidance is much appreciated, thanks!

Working directrunner job:

import apache_beam as beam
import argparse, xmltodict, json
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
import xml.etree.ElementTree as ET

class FormatMessage(beam.DoFn):
    def process(self, line):
        xml_msg = ET.fromstring(line)

        # Code to construct XML Object (removed) 

        tree_new_xml = ET.ElementTree(element_msg)
        xml_dict = xmltodict.parse(ET.tostring(tree_new_xml.getroot(), encoding='utf8'))
        json_obj = str.encode(json.dumps(xml_dict), 'utf8')

        yield json_obj

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_topic', help='Input topic read data from.', default='')
    parser.add_argument('--output_topic', help='Output topic to write data to.', default='')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True

    # Create and implement PubSub-to-PubSub pipeline
    p = beam.Pipeline(options=pipeline_options)
    (p
     | "Read PubSub Message" >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
     | "Format Msg" >> beam.ParDo(FormatMessage())
     | "Write PubSub Output" >> beam.io.WriteToPubSub(known_args.output_topic)
     )
    p.run().wait_until_finish()
1
Can you provide a more complete code segment? It will certainly help flush out where the issue might be.James Powis
@JamesPowis included the segment in the original post, thanks in advance for taking a look!Bernard Wong
@BernardWong try doing import xml.etree.ElementTree as ET inside your function processrmesteves
@rmesteves thanks! I was able to make that change and have ET recognized. I also was able to set the --requirements_file flag to take in a requirements.txt for my other libraries. It seems that the workers using functions I specify in the pipeline are not all configured the same? Anyway, troubleshooting a "Dataflow unable to determine backlog for pubsub subscription" issue now. Again, Directrunner works with the code, but the read into my first PubSub topic is not functioning while using the Dataflowrunner.Bernard Wong
@BernardWong I will post this as the answer for the first problem. For this second error, I suggest that you create another post so it can be more organized following the stack rules. Anyway, I'm trying to understand what is going on nowrmesteves

1 Answers

0
votes

According to this documentation, your problem is due to names that are not available on the Dataflow worker.

Notice that if you have objects in your global namespace that cannot be pickled, you will get a pickling error. If the error is regarding a module that should be available in the Python distribution, you can solve this by importing the module locally, where it is used.

In your case you have to import the mentioned library inside your process function just like below:

class FormatMessage(beam.DoFn):
    def process(self, line):
        import xml.etree.ElementTree as ET
        xml_msg = ET.fromstring(line)

        # Code to construct XML Object (removed) 

        tree_new_xml = ET.ElementTree(element_msg)
        xml_dict = xmltodict.parse(ET.tostring(tree_new_xml.getroot(), encoding='utf8'))
        json_obj = str.encode(json.dumps(xml_dict), 'utf8')

        yield json_obj