Whenever I try to execute my pipeline template with add_value_provider_argument per the instructions here: https://cloud.google.com/dataflow/docs/templates/creating-templates, the pipeline tries to execute, causing an error, instead of uploading to the GCS bucket.
Here is the code I'm deploying:
#deploy
python -m main \
--runner DataflowRunner \
--project $PROJECT \
--staging_location gs://$DATAFLOW_BUCKET/staging \
--temp_location gs://$DATAFLOW_BUCKET/temp \
--output gs://$DATAFLOW_BUCKET/output \
--template_location gs://$DATAFLOW_BUCKET/templates/$TEMPLATE_NAME
#pipeline.py
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument( #add_argument
'--date',
required = False,
default='2018-09-28',
help='Date to process, e.g. 2018-09-28'
)
RUNNER_TYPE = 'DataflowRunner'
version = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
pipeline_options = PipelineOptions()
custom_options = pipeline_options.view_as(MyOptions)
options = PipelineOptions()
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = 'test-v{}'.format(version)
google_cloud_options.staging_location = 'gs://{}/staging'.format(STAGING_BUCKET)
google_cloud_options.temp_location = 'gs://{}/temp'.format(STAGING_BUCKET)
pipeline_options.view_as(StandardOptions).runner = RUNNER_TYPE
pipeline_options.view_as(StandardOptions).streaming = False
#installing packages used in process
setup_options = pipeline_options.view_as(SetupOptions)
setup_options.setup_file = './setup.py'
setup_options.save_main_session = False
def run(argv=None):
with beam.Pipeline(options=pipeline_options) as p:
read_file = 'gs://{}/{}-*'\
.format(DATA_BUCKET,custom_options.date)
data = (p | 'Read' >> ReadFromText(read_file,coder=JsonCoder())
| 'ParseData' >> beam.ParDo(parse_data)
| 'FragmentData' >> beam.ParDo(fragment)
| 'CleanHeader' >> beam.ParDo(clean_header)
| 'RemoveMalformedRows' >> beam.ParDo(remove_malformed_rows)
| 'ZipData' >> beam.ParDo(zip_data)
| 'FilterFields' >> beam.ParDo(filter_fields)
)
bigquery_write_fn = BigQueryWriteFn(table_id=TABLE_ID,dataset_id=DATASET_ID,
project_id=PROJECT_ID,batch_size=100,schema=SCHEMA,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND, #WRITE_TRUNCATE, WRITE_APPEND
client=None)
data | 'WriteToBigQuery' >> beam.ParDo(bigquery_write_fn)
if __name__ == '__main__':
run()
The error:
IOError: No files found based on the file pattern gs://<BUCKET>/RuntimeValueProvider(option: date, type: str, default_value: '2018-09-28')-*
Odd thing is when I use parser.add_argument instead of parser.add_value_provider_argument the template uploads to GCS but I can't change the parameter from the default.
Why does the pipeline execute instead of uploading when changing the parser argument from add_argument to add_value_provider_argument.