0
votes

We are experimenting with Apache Beam (using Go SDK) and Dataflow to parallelize one of our time consuming tasks. For little more context, we have caching job which takes some queries, runs it across database and caches them. Each database query may take few seconds to many minutes and we want to run those in parallel for quicker task completion.

Created a simple pipeline that looks like this:

    // Create initial PCollection.
    startLoad := beam.Create(s, "InitialLoadToStartPipeline")

    // Emits a unit of work along with query and date range.
    cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)

    // Emits a cache response which includes errCode, errMsg, time etc.
    cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)

    ...

The number units which getCachePayloadsFn emits are not a lot and will be mostly in hundreds and max few thousands in production.

Now the issue is cacheQueryDoFn is not getting executed in parallel and queries are getting executed sequentially one by one. We confirmed this by putting logs in StartBundle and ProcessElement by logging goroutine id, process id, start and end time etc in caching function to confirm that there is no overlap in execution.

We would want to run the queries always in parallel even if there are just 10 queries. From our understanding and documentations, it creates bundles from the overall input and those bundles run in parallel and within bundle it runs sequentially. Is there a way to control the number of bundles from the load or any way to increase parallelism?

Things we tried:

  1. Keeping num_workers=2 and autoscaling_algorithm=None. It starts two VMs but runs Setup method to initialize DoFn on only one VM and uses that for entire load.
  2. Found sdk_worker_parallelism option here. But not sure how to correctly set it. Tried setting it with beam.PipelineOptions.Set("sdk_worker_parallelism", "50"). No effect.
1

1 Answers

4
votes

By default, the Create is not parallel and all the DoFns are being fused into the same stage as the Create, so they also have no parallelism. See https://beam.apache.org/documentation/runtime/model/#dependent-parallellism for some more info on this.

You can explicitly force a fusion break with the Reshuffle transform.