I am using google dataflow on my work.
While I am using dataflow, I need to set number of workers dynamically while dataflow batch job is running.
That's mainly because of cloud bigtable QPS.
We are using 3 bigtable cluster nodes and they can't afford to receiving all traffics from 500 number of workers instantly.
So, I gotta change number of workers(from 500 to 25) just before trying to insert all the processed data into the bigtable.
Is there any way to achieve this goal?
2 Answers
Dataflow does not provide the ability to manually change the resource allocation of a batch job while it is running, however:
1) We plan to incorporate throttling into our autoscaling algorithms, so Dataflow would detect that it needs to downsize while writing to your bigtable. I don't have a concrete ETA, but this is definitely on our roadmap.
2) Meanwhile, you try to can artificially limit the parallelism of your pipeline by a trick like this:
- Take your
PCollection<Something>
(Something
being the data type you're writing to bigtable) - Pipe it through a sequence of transforms:
ParDo(pair with a random key in 0..25)
,GroupByKey
,ParDo(ungroup and remove random key)
. You get, again, aPCollection<Something>
- Write this collection to Bigtable.
The trick here is that there is no parallelization within a single key after a GroupByKey, so the result of GroupByKey is a collection of 25 key-value pairs (where the value is an Iterable<Something>
) that can't be processed by more than 25 workers in parallel. The ParDo's following it will likely get fused together with the writing to Bigtable, and will thus have a parallelism of 25.
The caveat is that Dataflow is within its right to materialize any intermediate collections if it predicts that this will improve performance of the pipeline. It may even do this just for the sake of increasing the degree of parallelism (which goes explicitly against your goal in this example). But if you have an urgent job to run, I believe right now this will probably do what you want.
Meanwhile the only long-term solution I can suggest, until we have throttling, is to use a smaller limit on number of workers, or use a larger Bigtable cluster, or both.
There's a lot of relevant information in the DATA & ANALYTICS: Analyzing 25 billion stock market events in an hour with NoOps on GCP talk from GCP/Next.
FWIW, you can increase the number of nodes of Bigtable before your batch job, give Bigtable a few minutes to adjust, and then start your job. You can turn down the Bigtable cluster when you're done with the batch job.