I'm working on Dataflow, I already has build my custom pipeline via Python SDK. I would like to add the parameters at the Dataflow UI into my custom pipeline. using the Additional Parameters. Reference by https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#staticvalue
Then I changed add_argument
to add_value_provider_argument
follow by google docs
class CustomParams(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--input_topic",
type = str,
)
parser.add_value_provider_argument(
"--window_size",
type = int,
default = 5,
)
def run():
pipeline_options = PipelineOptions(pipeline_args, .....)
custom_param = pipeline_options.view_as(CustomParams)
.....
pipeline | "Read PubSub Message" >> beam.io.ReadFromPubSub(custom_param.input_topic)
After that, I try to making a templates to GCP. The script for upload look like
python custom_pipeline.py \
--runner DataflowRunner \
--project YOUR_PROJECT_ID \
--staging_location gs://YOUR_BUCKET_NAME/staging \
--temp_location gs://YOUR_BUCKET_NAME/temp \
--template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
But I got the error when I creating template for upload to GCS, like this
TypeError: expected string or bytes-like object
at the line beam.io.ReadFromPubSub()
It's look like the thing I got from add_value_provider_argument
Is RuntimeValueProvider object. So I'm quite confuse what I have to do for fix this?
I try to fix this problem such as
Casting the data type
beam.io.ReadFromPubSub(str(custom_param.input_topic))
But got this error,
ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got "RuntimeValueProvider(option: input_topic, type: str, default_value: '...')").
So Please anyone have troubleshooting for this? I have no idea how to go no it.
ReadFromPubSub
module doesn't acceptValueProvider
. Have you checked this Stack thread? – Nick_Kh