1
votes

In the documentation here (https://beam.apache.org/documentation/programming-guide/#additional-outputs) at 4.5.2 there is a pvalue.TaggedOutput() yielded.

The pvalue seems to be hard to import, I have the import lines copied from the apache documentation and I use the --save_main_session option as well as the save_main_session=True in the def run() as well as pipeline_options.view_as(SetupOptions).save_main_session = save_main_session before I start the pipeline. All imports work for all functions and all Classes work in all functions. But not pvalue. I have also tried all of these in every possible combination as well as leaving them out. pvalue is always unknown.

I take all my code from the cookbook here: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py

Nontheless, there is no pvalue.
NameError: name 'pvalue' is not defined [while running 'generatedPtransform-1725']

This error is only generated when I use the Dataflowrunner, not when I use the Directrunner.

Example of my DoFn

class Splitter(beam.DoFn):

    TAG1 = 'kleintje'
    TAG2 = 'grootje'

    def process(self, element):
        splittertid = element.get('id')

        if splittertid < 100:
            yield pvalue.TaggedOutput(self.TAG1, element)
        else:
            yield pvalue.TaggedOutput(self.TAG2, element)

Example of my run()

def run(argv=None, save_main_session=True):
    sources = [
        json.loads('{"id":72234,"value":1'),
        json.loads('{"id":23,"value":2}')
        ]

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:
           | beam.Create(sources)
           | beam.ParDo(Splitter()).with_outputs(Splitter.TAG1,Splitter.TAG2,main=Splitter.TAG1)

** my imports **

from __future__ import absolute_import

import argparse
import logging
import re
import json
import datetime
from jsonschema import validate

import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
2
the source has from apache_beam import pvalue, are you doing this as well?Sam Mason
yes, I've added my imports, maybe I did it wrong somehowThijs
Have you tried using it like this. import apache beam -apache_beam.pvalue.TaggedOutput I am wondering of something changes pvalue somehow. Or you could use beam.pvalue.TaggedOutputAlex Amato

2 Answers

2
votes

You should try importing pvalue within your class Splitter due the dependencies should be declared within the classes and functions when using Apache beam.

Your code should be like this

class Splitter(beam.DoFn):
    from apache_beam import pvalue
    TAG1 = 'kleintje'
    TAG2 = 'grootje'

    def process(self, element):
        splittertid = element.get('id')

        if splittertid < 100:
            yield pvalue.TaggedOutput(self.TAG1, element)
        else:
            yield pvalue.TaggedOutput(self.TAG2, element)

You are able to use from apache_beam import pvalue normally with Directrunner due the code is running locally; however, when using the Dataflowrunner, the code should follow a structure to handle correctly the dependencies.

0
votes

Somehow the dependencies on the Dataflowrunner got messed up. By loading a wrong set of dependencies and then removing them again things suddenly started working. Importing like from apache_beam import pvalue seems to be right after all.

Maybe the lesson learned here is that it may be possible to have corrupt dependencies and you can fix those by a forced reinstall triggered by an install and uninstall of older or wrong apache_beam packages.