2
votes

Whenever I try to execute my pipeline template with add_value_provider_argument per the instructions here: https://cloud.google.com/dataflow/docs/templates/creating-templates, the pipeline tries to execute, causing an error, instead of uploading to the GCS bucket.

Here is the code I'm deploying:

#deploy

python -m main \
--runner DataflowRunner \
--project $PROJECT \
--staging_location gs://$DATAFLOW_BUCKET/staging \
--temp_location gs://$DATAFLOW_BUCKET/temp \
--output gs://$DATAFLOW_BUCKET/output \
--template_location gs://$DATAFLOW_BUCKET/templates/$TEMPLATE_NAME

#pipeline.py

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):      
    parser.add_value_provider_argument( #add_argument
                        '--date',
                        required = False,
                        default='2018-09-28',
                        help='Date to process, e.g. 2018-09-28'                        
                        )    


RUNNER_TYPE = 'DataflowRunner'

version = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')

pipeline_options = PipelineOptions()
custom_options = pipeline_options.view_as(MyOptions)

options = PipelineOptions()

google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = 'test-v{}'.format(version)
google_cloud_options.staging_location = 'gs://{}/staging'.format(STAGING_BUCKET)
google_cloud_options.temp_location = 'gs://{}/temp'.format(STAGING_BUCKET)
pipeline_options.view_as(StandardOptions).runner = RUNNER_TYPE
pipeline_options.view_as(StandardOptions).streaming = False

#installing packages used in process
setup_options = pipeline_options.view_as(SetupOptions)
setup_options.setup_file = './setup.py'
setup_options.save_main_session = False       

def run(argv=None):

    with beam.Pipeline(options=pipeline_options) as p: 

        read_file = 'gs://{}/{}-*'\
                            .format(DATA_BUCKET,custom_options.date)


        data = (p | 'Read' >> ReadFromText(read_file,coder=JsonCoder())
                  | 'ParseData' >> beam.ParDo(parse_data)
                  | 'FragmentData' >> beam.ParDo(fragment)
                  | 'CleanHeader' >> beam.ParDo(clean_header)
                  | 'RemoveMalformedRows' >> beam.ParDo(remove_malformed_rows)
                  | 'ZipData' >> beam.ParDo(zip_data)
                  | 'FilterFields' >> beam.ParDo(filter_fields)   
        )

        bigquery_write_fn = BigQueryWriteFn(table_id=TABLE_ID,dataset_id=DATASET_ID,
                                        project_id=PROJECT_ID,batch_size=100,schema=SCHEMA,
                                        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                        write_disposition=BigQueryDisposition.WRITE_APPEND, #WRITE_TRUNCATE, WRITE_APPEND
                                        client=None)        

        data | 'WriteToBigQuery' >> beam.ParDo(bigquery_write_fn)           

if __name__ == '__main__':
    run()

The error:

IOError: No files found based on the file pattern gs://<BUCKET>/RuntimeValueProvider(option: date, type: str, default_value: '2018-09-28')-*

Odd thing is when I use parser.add_argument instead of parser.add_value_provider_argument the template uploads to GCS but I can't change the parameter from the default.

Why does the pipeline execute instead of uploading when changing the parser argument from add_argument to add_value_provider_argument.

1

1 Answers

3
votes

I was bumping into the same problem recently and the problem is that ValueProvider objects are not available during pipeline construction. That means that with Python you cannot specify filenames or build dynamic filenames based on RuntimeValueProviders.

What you need to do instead is to add another argument that replaces the "read_file" variable and is passed directly to the ReadFromText method.

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):      
    parser.add_value_provider_argument( #add_argument
                        '--input',
                        required = False,
                        default='',
                        help='Full path to input file'                        
                        )    


RUNNER_TYPE = 'DataflowRunner'

version = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')

pipeline_options = PipelineOptions()
custom_options = pipeline_options.view_as(MyOptions)

options = PipelineOptions()

google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = 'test-v{}'.format(version)
google_cloud_options.staging_location = 'gs://{}/staging'.format(STAGING_BUCKET)
google_cloud_options.temp_location = 'gs://{}/temp'.format(STAGING_BUCKET)
pipeline_options.view_as(StandardOptions).runner = RUNNER_TYPE
pipeline_options.view_as(StandardOptions).streaming = False

#installing packages used in process
setup_options = pipeline_options.view_as(SetupOptions)
setup_options.setup_file = './setup.py'
setup_options.save_main_session = False       

def run():

    with beam.Pipeline(options=pipeline_options) as p: 

        data = (p | 'Read' >> ReadFromText(custom_options.input,coder=JsonCoder())
                  | 'ParseData' >> beam.ParDo(parse_data)
                  | 'FragmentData' >> beam.ParDo(fragment)
                  | 'CleanHeader' >> beam.ParDo(clean_header)
                  | 'RemoveMalformedRows' >> beam.ParDo(remove_malformed_rows)
                  | 'ZipData' >> beam.ParDo(zip_data)
                  | 'FilterFields' >> beam.ParDo(filter_fields)   
        )

        bigquery_write_fn = BigQueryWriteFn(table_id=TABLE_ID,dataset_id=DATASET_ID,
                                        project_id=PROJECT_ID,batch_size=100,schema=SCHEMA,
                                        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                        write_disposition=BigQueryDisposition.WRITE_APPEND, #WRITE_TRUNCATE, WRITE_APPEND
                                        client=None)        

        data | 'WriteToBigQuery' >> beam.ParDo(bigquery_write_fn)           

if __name__ == '__main__':
    run()