We are experimenting with Apache Beam (using Go SDK) and Dataflow to parallelize one of our time consuming tasks. For little more context, we have caching job which takes some queries, runs it across database and caches them. Each database query may take few seconds to many minutes and we want to run those in parallel for quicker task completion.
Created a simple pipeline that looks like this:
// Create initial PCollection.
startLoad := beam.Create(s, "InitialLoadToStartPipeline")
// Emits a unit of work along with query and date range.
cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)
// Emits a cache response which includes errCode, errMsg, time etc.
cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)
...
The number units which getCachePayloadsFn
emits are not a lot and will be mostly in hundreds and max few thousands in production.
Now the issue is cacheQueryDoFn
is not getting executed in parallel and queries are getting executed sequentially one by one. We confirmed this by putting logs in StartBundle
and ProcessElement
by logging goroutine id, process id, start and end time etc in caching function to confirm that there is no overlap in execution.
We would want to run the queries always in parallel even if there are just 10 queries. From our understanding and documentations, it creates bundles from the overall input and those bundles run in parallel and within bundle it runs sequentially. Is there a way to control the number of bundles from the load or any way to increase parallelism?
Things we tried:
- Keeping
num_workers=2
andautoscaling_algorithm=None
. It starts two VMs but runsSetup
method to initialize DoFn on only one VM and uses that for entire load. - Found
sdk_worker_parallelism
option here. But not sure how to correctly set it. Tried setting it withbeam.PipelineOptions.Set("sdk_worker_parallelism", "50")
. No effect.