In the documentation here (https://beam.apache.org/documentation/programming-guide/#additional-outputs) at 4.5.2 there is a pvalue.TaggedOutput()
yielded.
The pvalue
seems to be hard to import, I have the import lines copied from the apache documentation and I use the --save_main_session
option as well as the save_main_session=True
in the def run()
as well as pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
before I start the pipeline. All imports work for all functions and all Classes work in all functions. But not pvalue
. I have also tried all of these in every possible combination as well as leaving them out. pvalue
is always unknown.
I take all my code from the cookbook here: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
Nontheless, there is no pvalue.NameError: name 'pvalue' is not defined [while running 'generatedPtransform-1725']
This error is only generated when I use the Dataflowrunner, not when I use the Directrunner.
Example of my DoFn
class Splitter(beam.DoFn):
TAG1 = 'kleintje'
TAG2 = 'grootje'
def process(self, element):
splittertid = element.get('id')
if splittertid < 100:
yield pvalue.TaggedOutput(self.TAG1, element)
else:
yield pvalue.TaggedOutput(self.TAG2, element)
Example of my run()
def run(argv=None, save_main_session=True):
sources = [
json.loads('{"id":72234,"value":1'),
json.loads('{"id":23,"value":2}')
]
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
| beam.Create(sources)
| beam.ParDo(Splitter()).with_outputs(Splitter.TAG1,Splitter.TAG2,main=Splitter.TAG1)
** my imports **
from __future__ import absolute_import
import argparse
import logging
import re
import json
import datetime
from jsonschema import validate
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
from apache_beam import pvalue
, are you doing this as well? – Sam Mason