0
votes

I'm using Apache Beam on Dataflow through Python API to read data from Bigquery, process it, and dump it into Datastore sink.

Unfortunately, quite often the job just hangs indefinitely and I have to manually stop it. While the data gets written into Datastore and Redis, from the Dataflow graph I've noticed that it's only a couple of entries that get stuck and leave the job hanging.

As a result, when a job with fifteen 16-core machines is left running for 9 hours (normally, the job runs for 30 minutes), it leads to huge costs.

Maybe there is a way to set a timer that would stop a Dataflow job if it exceeds a time limit?

2

2 Answers

3
votes

It would be great if you can create a customer support ticket where we would could try to debug this with you.

Maybe there is a way to set a timer that would stop a Dataflow job if it exceeds a time limit?

Unfortunately the answer is no, Dataflow does not have an automatic way to cancel a job after a certain time. However, it is possible to do this using the APIs. It is possible to wait_until_finish() with a timeout then cancel() the pipeline.

You would do this like so:

p = beam.Pipeline(options=pipeline_options)
p | ... # Define your pipeline code

pipeline_result = p.run()  # doesn't do anything
pipeline_result.wait_until_finish(duration=TIME_DURATION_IN_MS)
pipeline_result.cancel()   # If the pipeline has not finished, you can cancel it
1
votes

To sum up, with the help of @ankitk answer, this works for me (python 2.7, sdk 2.14):

pipe = beam.Pipeline(options=pipeline_options)
...  # main pipeline code
run = pipe.run()  # doesn't do anything
run.wait_until_finish(duration=3600000)  # (ms) actually starts a job
run.cancel()  # cancels if can be cancelled

Thus, in case if a job was successfully finished within the duration time in wait_until_finished() then cancel() will just print a warning "already closed", otherwise it will close a running job.

P.S. if you try to print the state of a job

state = run.wait_until_finish(duration=3600000)
logging.info(state)

it will be RUNNING for the job that wasn't finished within wait_until_finished(), and DONE for finished job.