1
votes

I'm trying to create a custom template for Google's dataflow. I simply want to print some messages from Pubsub to the console. When I try to stage my template, I get an error that Cloud Pub/Sub is only available for streaming pipelines while my pipeline is intended to be a streaming pipeline :x. What am I doing that makes my pipeline batched instead of streamed?

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class PrintExample(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--welcome', type=str)


TOPIC = ...
PROJECT = ...
BUCKET = ...


pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project=PROJECT,
    job_name='printtemplate01',
    temp_location='gs://{}/temp'.format(BUCKET),
    region='us-central1'
)

with beam.Pipeline(options=pipeline_options) as p:
    options = pipeline_options.view_as(PrintExample)
    (
        p
        | "Extract PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC)
        | "Print" >> beam.Map(print)
    )
    p.run()

Then I run

python -m PrintTemplate.py 
    --runner DataflowRunner --project [PROJECT] 
    --staging_location gs://[BUCKET]/staging 
    --temp_location gs://[BUCKET]/temp 
    --template_location gs://[BUCKET]/templates/PrintTemplate

Resulting in:

ValueError: Cloud Pub/Sub is currently available for use only in streaming pipelines.
1

1 Answers

2
votes

You were almost there. Just add --streaming to your command.

python -m PrintTemplate.py 
    --runner DataflowRunner --project [PROJECT] 
    --staging_location gs://[BUCKET]/staging 
    --temp_location gs://[BUCKET]/temp 
    --template_location gs://[BUCKET]/templates/PrintTemplate
    --streaming

I see that you're using PipelineOptions. You could also pass streaming=True.

pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project=PROJECT,
    job_name='printtemplate01',
    temp_location='gs://{}/temp'.format(BUCKET),
    region='us-central1',
    streaming=True
)