6
votes

We want to improve the costs of running a specific Apache Beam pipeline (Python SDK) in GCP Dataflow.

We have built a memory-intensive Apache Beam pipeline, which requires approximately 8.5 GB of RAM to be run on each executor. A large machine learning model is currently loaded in a transformation DoFn.setup method so we can precompute recommendations for a few millions of users.

The existing GCP Compute Engine machine types either have a lower memory/vCPU ratio than we require (up to 8GB RAM per vCPU) or a much higher proportion (24GB RAM per vCPU): https://cloud.google.com/compute/docs/machine-types#machine_type_comparison

We have successfully run this pipeline by using the GCP m1-ultramem-40 machine type. However, the hardware usage - and therefore, the costs - were sub-optimal. This machine type has a ratio of 24 GB RAM per vCPU. When using it to run the said pipeline, the VMs used less than 36% of the memory available - but, as expected, we paid for it all.

When attempting to run the same pipeline using a custom-2-13312 machine type (2 vCPU and 13 GB RAM), Dataflow crashed, with the error:

   Root cause: The worker lost contact with the service.

While monitoring the Compute Engine instances running the Dataflow job, it was clear that they were running out of memory. Dataflow tried to load the model in memory twice - once per vCPU - but the available memory was only enough for one.

If we were able to inform Apache Beam/Dataflow that a particular transformation requires a specific amount of memory, the problem would be solved. But we didn't manage to find a way of achieving this.

The other solution we could think of was to try to change the ratio of Dataflow executors per Compute Engine VM. This would allow us to find a ratio in which we would waste as little vCPU as possible while respecting the pipeline memory requirements. While using the previously mentioned custom-2-13312 machine type, we attempted to run the pipeline using the following configurations:

  1. --number_of_worker_harness_threads=1 --experiments=use_runner_v2
  2. --experiments=no_use_multiple_sdk_containers --experiments=beam_fn_api
  3. --sdk_worker_parallelism=1

When using (1), we managed to have a single thread, but Dataflow spawned two Python executor processes per VM. It resulted in the pipeline crashing as there was an attempt of loading the model to memory twice when there was enough space for only one.

When using (2), a single Python process was spawn per VM, but it ran using two threads. Each of those threads tried to load the model, and the VM runs out of memory. Approach (3) had a very similar outcome to (1) and (2).

It was not possible to combine multiple of these configurations.

Would there be a (set of) configuration(s) which would allow us to have control on the number of executors of Dataflow per VM?

Are there any other alternatives to reducing the costs which we might not have though of?

2
I have a same problem (I think). How did you check memory usage of the job? job metrics tab only shows CPU usage?Kazuki
I profiled the memory in the compute engine instances which were running the pipeline. A simple way of doing this is by SSHing into the VMs & using top. By doing this, you should be able to see the memory available decreasing until the VM no longer has any memory available and it gets killed.Tatiana Al-Chueyr

2 Answers

3
votes

We are working on long-term solutions to these problems, but here is a tactical fix that should prevent the model duplication that you saw in approaches 1 and 2:

Share the model in a VM across workers, to avoid it being duplicated in each worker. Use the following utility (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py), which is available out of the box in Beam 2.24 If you are using an earlier version of Beam, copy just the shared.py to your project and use it as user code.

2
votes

I don't think that at this moment there's an option to control the number of executors per VM, it seems that the closest that you will get there is by using the option (1) and assume a Python executor per core.

Option (1)

--number_of_worker_harness_threads=1 --experiments=use_runner_v2

To compensate on the cpu-mem ratio you need, I'd suggest using custom machines with extended memory. This approach should be more cost-effective.

For example, the cost of a running a single executor and a single thread on a n1-standard-4 machine (4 CPUs - 15GB) will be roughly around 30% more expensive than running the same workload using a custom-1-15360-ext (1 CPU - 15GB) custom machine.