3
votes

I've written a streaming Google Dataflow pipeline in python using the beam SDK. There's documentation about how I run this locally and set the -runner flag to run it on Dataflow.

I'm now trying to automate the deployment of this to a CI pipeline (bitbucket pipelines but not really relevant). There is documentation on how to 'run' a pipeline, but not really 'deploy' it. The commands I've tested with look like:

python -m dataflow --runner "DataflowRunner" \
                   --jobName "<jobName>" \
                   --topic "<pub-sub-topic"" \
                   --project "<project>" \
                   --dataset "<dataset>" \
                   --worker_machine_type "n1-standard-2" \
                   --temp_location "gs://<bucket-name>/tmp/"

This will run the job, but because it's streaming it will never return. It also internally manages the packaging and pushing to a bucket. I know if I kill that process it keeps running, but setting that up on a CI server in a way where I can detect whether the process actually succeeded or I just killed it after some timeout is difficult.

This seems ridiculous and like I'm missing something obvious, but how do I package and run this module on dataflow in a way I can reliably know it deployed from a CI pipeline?

2
shouldn't it be deployed as a template ?allan.simon

2 Answers

2
votes

So yes, it was something dumb.

Basically when you use the

with beam.Pipeline(options=options) as p: 

syntax, under the hood it's calling wait_until_finish. So the wait was being invoked without me realizing, causing it to hang around forever. Refactoring to remove the context manager fixes the problem.

0
votes

To expand on jamielennox's answer.

When on the direct runner on your local development environment, you want to see the pipeline running indefinitely; perhaps only to manually cancel with Ctrl-C after a while.

When deploying the pipeline to run on GCP's Dataflow, you want your script to deploy the job and end.

runner_name = pipeline_options.get_all_options().get('runner')

if runner_name == 'DirectRunner':
    with beam.Pipeline(options=pipeline_options) as pipeline:
        _my_setup_pipeline(config, pipeline, subscription_full_name)

elif runner_name == 'DataflowRunner':
    pipeline = beam.Pipeline(options=pipeline_options)
    _my_setup_pipeline(config, pipeline, subscription_full_name)
    pipeline.run()

else:
    raise Exception(f'Unknown runner: {runner_name}')