I am experimenting with creating my own Templates for Google Cloud Dataflow, so that jobs can be executed from the GUI, making it easier for others to execute. I have followed the tutorials, created my own class of PipelineOptions, and populated it with the parser.add_value_provider_argument() method. When I then try to pass these arguments into the pipeline, using my_options.argname.get(), I get an error, telling me the item is not called from a runtime context. I don't understand this. The args aren't part of the defining the pipeline graph itself, they are just parameters such as input filename, output tablename, etc.
Below is the code. It works if I hardcode the input filename, output tablename, write Disposition, and delimiter. If I replace these with their my_options.argname.get() equivalent, it fails. In the snippet shown, I have hardcoded everything except the outputBQTable name, where I use my_options.outputBQTable.get(). This fails, with the following message.
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: outputBQTable, type: str, default_value: 'dataflow_csv_reader_testing.names').get() not called from a runtime context
I appreciate any guidance on how to get this to work.
import apache_beam
from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import RuntimeValueProvider
import csv
import argparse
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
parser.add_value_provider_argument('--inputGCS', type=str,
default='gs://mybucket/df-python-csv-test/test-dict.csv',
help='Input gcs csv file, full path and filename')
parser.add_value_provider_argument('--delimiter', type=str,
default=',',
help='Character used as delimiter in csv file, default is ,')
parser.add_value_provider_argument('--outputBQTable', type=str,
default='dataflow_csv_reader_testing.names',
help='Output BQ Dataset.Table to write to')
parser.add_value_provider_argument('--writeDisposition', type=str,
default='WRITE_APPEND',
help='BQ write disposition, WRITE_TRUNCATE or WRITE_APPEND or WRITE_EMPTY')
def main():
optlist=PipelineOptions()
my_options=optlist.view_as(MyOptions)
p = apache_beam.Pipeline(options=optlist)
(p
| 'create' >> apache_beam.Create(['gs://mybucket/df-python-csv-test/test-dict.csv'])
| 'read gcs csv dict' >> apache_beam.FlatMap(lambda file: csv.DictReader(apache_beam.io.gcp.gcsio.GcsIO().open(file,'r'), delimiter='|'))
| 'write bq record' >> apache_beam.io.Write(apache_beam.io.BigQuerySink(my_options.outputBQTable.get(), write_disposition='WRITE_TRUNCATE'))
)
p.run()
if __name__ == '__main__':
main()