
I've written a streaming Google Dataflow pipeline in python using the beam SDK. There's documentation about how I run this locally and set the -runner flag to run it on Dataflow.

I'm now trying to automate the deployment of this to a CI pipeline (bitbucket pipelines but not really relevant). There is documentation on how to 'run' a pipeline, but not really 'deploy' it. The commands I've tested with look like:

python -m dataflow --runner "DataflowRunner" \
                   --jobName "<jobName>" \
                   --topic "<pub-sub-topic"" \
                   --project "<project>" \
                   --dataset "<dataset>" \
                   --worker_machine_type "n1-standard-2" \
                   --temp_location "gs://<bucket-name>/tmp/"

This will run the job, but because it's streaming it will never return. It also internally manages the packaging and pushing to a bucket. I know if I kill that process it keeps running, but setting that up on a CI server in a way where I can detect whether the process actually succeeded or I just killed it after some timeout is difficult.

This seems ridiculous and like I'm missing something obvious, but how do I package and run this module on dataflow in a way I can reliably know it deployed from a CI pipeline?

shouldn't it be deployed as a template ?allan.simon

2 Answers


So yes, it was something dumb.

Basically when you use the

with beam.Pipeline(options=options) as p: 

syntax, under the hood it's calling wait_until_finish. So the wait was being invoked without me realizing, causing it to hang around forever. Refactoring to remove the context manager fixes the problem.


To expand on jamielennox's answer.

When on the direct runner on your local development environment, you want to see the pipeline running indefinitely; perhaps only to manually cancel with Ctrl-C after a while.

When deploying the pipeline to run on GCP's Dataflow, you want your script to deploy the job and end.

runner_name = pipeline_options.get_all_options().get('runner')

if runner_name == 'DirectRunner':
    with beam.Pipeline(options=pipeline_options) as pipeline:
        _my_setup_pipeline(config, pipeline, subscription_full_name)

elif runner_name == 'DataflowRunner':
    pipeline = beam.Pipeline(options=pipeline_options)
    _my_setup_pipeline(config, pipeline, subscription_full_name)

    raise Exception(f'Unknown runner: {runner_name}')