0
votes

I am running a GCP Dataflow pipeline by deploying it as a template on GCP. I need to run a BigQuery read statement in the pipeline. The conditional parameter of which needs to be passed dynamically. How do I do so?

The query I want to run is something like

select * from tabel_name where field1=[dynamic_value]

Here is the sample code to run the query

    import apache_beam as beam
    query_string = "select * from tabel_name where field1=[dynamic_value]"

    options = {'project': PROJECT_ID, 'runner': 'Direct', "temp_location": "gs://my_bucket/temp",'region': "australia-southeast1", }
    pipeline_options = beam.pipeline.PipelineOptions(sys.argv,**options)
    custom_options = pipeline_options.view_as(MyOptions)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        sample = (
                pipeline
                | 'QueryTable' >> beam.io.ReadFromBigQuery(query=query_string, use_standard_sql=False)
                | "Processing" >> beam.ParDo(MyPreprocessor())
                | beam.Map(print))

I need to pass the dynamic_value from a command line option --dynamic_value. I can pass it as a sys.argv parameter if it is not a template, however if I deploy a template it expects the dynamic_value as a PipelineOption. How do I dynamically create the query? Surprisingly the method beam.io.ReadFromBigQuery has a mechanism of taking a dataset, project_id and table as parameters but does not have any parameter where we can specify a filter for the data. Querying the entire table is un-necessary and expensive. Can someone provide a solution to the same.

1
Edit your question and show the template that matches your code. Read this link for tips on how to create good questions: stackoverflow.com/help/how-to-ask - John Hanley
Did you try to pass the dynamic_value as pipeline parameter, to get the valud in the pipeline_option and to build your query_string with it before starting your pipeline? - guillaume blaquiere
@guillaume-blaquiere if we pass it as a pipeline parameter then during template deploy it asks for the parameter to be present, which is not what we need. We need to pass the parameter at runtime. That cannot be processes directly as a string concatenation. As runtime value providers can be processed in ParDo's and the ReadFromBigQuery does not take side inputs so cant do a ParDo there. - Raj Oberoi
Did you have a look to the valueprovider pattern? cloud.google.com/dataflow/docs/guides/templates/… - guillaume blaquiere

1 Answers

0
votes

You should look into using Flex Templates to support fully dynamic configuration of queries as a template.