0
votes

I'm working on Dataflow, I already has build my custom pipeline via Python SDK. I would like to add the parameters at the Dataflow UI into my custom pipeline. using the Additional Parameters. Reference by https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#staticvalue

enter image description here

Then I changed add_argument to add_value_provider_argument follow by google docs

class CustomParams(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):        
        parser.add_value_provider_argument(
            "--input_topic",
            type = str,
        )
        parser.add_value_provider_argument(
            "--window_size",
            type = int,
            default = 5,
        )

def run():
    pipeline_options = PipelineOptions(pipeline_args, .....)
    custom_param = pipeline_options.view_as(CustomParams)
    .....
    pipeline | "Read PubSub Message" >> beam.io.ReadFromPubSub(custom_param.input_topic)

After that, I try to making a templates to GCP. The script for upload look like

  python custom_pipeline.py \
    --runner DataflowRunner \
    --project YOUR_PROJECT_ID \
    --staging_location gs://YOUR_BUCKET_NAME/staging \
    --temp_location gs://YOUR_BUCKET_NAME/temp \
    --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

But I got the error when I creating template for upload to GCS, like this

TypeError: expected string or bytes-like object

at the line beam.io.ReadFromPubSub()

It's look like the thing I got from add_value_provider_argument Is RuntimeValueProvider object. So I'm quite confuse what I have to do for fix this?

I try to fix this problem such as

Casting the data type

beam.io.ReadFromPubSub(str(custom_param.input_topic))

But got this error,

ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got "RuntimeValueProvider(option: input_topic, type: str, default_value: '...')").

So Please anyone have troubleshooting for this? I have no idea how to go no it.

1
It seems that ReadFromPubSub module doesn't accept ValueProvider. Have you checked this Stack thread?Nick_Kh
@mk_sta So that mean It's no way to use Additional Parametes to my pipeline right? It's look like Additional Parametes send the parameter as RuntimeValueProvider and the ReadFromPubSub didn't support it.zzob
I haven't checked this yet, will do further research and update here.Nick_Kh

1 Answers

3
votes

As mentioned by @mk_sta

It seems that ReadFromPubSub module doesn't accept ValueProvider. Have you checked this Stack thread?

and explained in that thread, ReadFromPubSub does not currently accept ValueProvider arguments since it is implemented as a native transform in Dataflow.

You can check I/O methods that accept runtime parameters for the ValueProvider support in different SDKs.

So at this moment, if you switch from Python SDK to Java SDK, the Read of PubSubIO does support ValueProvider.