I am currently using Dataflow to handle/massage an XML text string from PubSub. I was able to successfully run a Dataflow job using DirectRunner for my --runner flag. However, I ran into an issue trying create a Dataflow resource using the exact same Dataflow job with DataflowRunner as my flag.
From the error logs (while using DataflowRunner), it seems that the Dataflow template that I've created does not recognize:
import xml.etree.ElementTree as ET
I receive "NameError: name 'ET' is not defined [while running 'generatedPtransform-419']" whenever I reference ET within my pipeline. What is peculiar is that my Dataflow job runs perfectly fine with the DirectRunner which leads me to believe there is an issue with using DataflowRunner to build my template as xml.etree.ElementTree is a simple/native PyPI library.
For my environment, I am working with:
Python 3.7.7
apache-beam 2.22.0
Any help/guidance is much appreciated, thanks!
Working directrunner job:
import apache_beam as beam
import argparse, xmltodict, json
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
import xml.etree.ElementTree as ET
class FormatMessage(beam.DoFn):
def process(self, line):
xml_msg = ET.fromstring(line)
# Code to construct XML Object (removed)
tree_new_xml = ET.ElementTree(element_msg)
xml_dict = xmltodict.parse(ET.tostring(tree_new_xml.getroot(), encoding='utf8'))
json_obj = str.encode(json.dumps(xml_dict), 'utf8')
yield json_obj
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input_topic', help='Input topic read data from.', default='')
parser.add_argument('--output_topic', help='Output topic to write data to.', default='')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
# Create and implement PubSub-to-PubSub pipeline
p = beam.Pipeline(options=pipeline_options)
(p
| "Read PubSub Message" >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
| "Format Msg" >> beam.ParDo(FormatMessage())
| "Write PubSub Output" >> beam.io.WriteToPubSub(known_args.output_topic)
)
p.run().wait_until_finish()