2
votes

I am experimenting with creating my own Templates for Google Cloud Dataflow, so that jobs can be executed from the GUI, making it easier for others to execute. I have followed the tutorials, created my own class of PipelineOptions, and populated it with the parser.add_value_provider_argument() method. When I then try to pass these arguments into the pipeline, using my_options.argname.get(), I get an error, telling me the item is not called from a runtime context. I don't understand this. The args aren't part of the defining the pipeline graph itself, they are just parameters such as input filename, output tablename, etc.

Below is the code. It works if I hardcode the input filename, output tablename, write Disposition, and delimiter. If I replace these with their my_options.argname.get() equivalent, it fails. In the snippet shown, I have hardcoded everything except the outputBQTable name, where I use my_options.outputBQTable.get(). This fails, with the following message.

apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: outputBQTable, type: str, default_value: 'dataflow_csv_reader_testing.names').get() not called from a runtime context

I appreciate any guidance on how to get this to work.

import apache_beam
from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import RuntimeValueProvider
import csv
import argparse

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls,parser):
        parser.add_value_provider_argument('--inputGCS', type=str,
              default='gs://mybucket/df-python-csv-test/test-dict.csv',
              help='Input gcs csv file, full path and filename')
        parser.add_value_provider_argument('--delimiter', type=str,
              default=',',
              help='Character used as delimiter in csv file, default is ,')
        parser.add_value_provider_argument('--outputBQTable', type=str,
              default='dataflow_csv_reader_testing.names',
              help='Output BQ Dataset.Table to write to')
        parser.add_value_provider_argument('--writeDisposition', type=str,
              default='WRITE_APPEND',
              help='BQ write disposition, WRITE_TRUNCATE or WRITE_APPEND or WRITE_EMPTY')

def main():
    optlist=PipelineOptions()
    my_options=optlist.view_as(MyOptions)
    p = apache_beam.Pipeline(options=optlist)
    (p
    | 'create'            >> apache_beam.Create(['gs://mybucket/df-python-csv-test/test-dict.csv'])
    | 'read gcs csv dict' >> apache_beam.FlatMap(lambda file: csv.DictReader(apache_beam.io.gcp.gcsio.GcsIO().open(file,'r'), delimiter='|'))
    | 'write bq record'   >> apache_beam.io.Write(apache_beam.io.BigQuerySink(my_options.outputBQTable.get(), write_disposition='WRITE_TRUNCATE'))
    )
    p.run()

if __name__ == '__main__':
    main()
2
I found a similar issue from last year, posted here on StackOverflow. It states ValueProviders (templated arguments) in Python are only available for regular File I/O - not for things like BigQuery. It is(was) a limitation of the Python SDK. Does anyone know the status of this? stackoverflow.com/questions/47134847/…mqunell

2 Answers

2
votes

You cannot use my_options.outputBQTable.get() when specifying the pipeline. The BigQuery sink already knows how to use runtime provided arguments, so I think you can just pass my_options.outputBQTable.

From what I gather from the documentation you should only use options.runtime_argument.get() in the process methods of your DoFns passed to ParDo steps.

Note: I tested with 2.8.0 of the Apache Beam SDK and so I used WriteToBigQuery instead of BigQuerySink.

0
votes

This is a feature yet to be developed for the Python SDK.

The related open issue can be found at the Apache Beam project page.

Until the above issue is solved, the workaround for now would be to use the Java SDK.