I'm trying to create a custom template for Google's dataflow. I simply want to print some messages from Pubsub to the console. When I try to stage my template, I get an error that Cloud Pub/Sub is only available for streaming pipelines while my pipeline is intended to be a streaming pipeline :x. What am I doing that makes my pipeline batched instead of streamed?
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class PrintExample(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--welcome', type=str)
TOPIC = ...
PROJECT = ...
BUCKET = ...
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project=PROJECT,
job_name='printtemplate01',
temp_location='gs://{}/temp'.format(BUCKET),
region='us-central1'
)
with beam.Pipeline(options=pipeline_options) as p:
options = pipeline_options.view_as(PrintExample)
(
p
| "Extract PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC)
| "Print" >> beam.Map(print)
)
p.run()
Then I run
python -m PrintTemplate.py
--runner DataflowRunner --project [PROJECT]
--staging_location gs://[BUCKET]/staging
--temp_location gs://[BUCKET]/temp
--template_location gs://[BUCKET]/templates/PrintTemplate
Resulting in:
ValueError: Cloud Pub/Sub is currently available for use only in streaming pipelines.