6
votes

I'm having huge performance issues with Datastore write speed. Most of the time it stays under 100 elements/s.

I was able to achieve the speeds of around 2600 elements/s when bench marking the write speed on my local machine using the datastore client (com.google.cloud:google-cloud-datastore) and running the batched writes in parallel.

I've set up a simple Apache Beam pipeline using the Java API. Here's it's graph:

full-pipeline

Here's speed when running without the Datastore node:

pipeline-without-datastore-write

It is much faster this way. It all points to DatastoreV1.Write to be a bottleneck in this pipeline - judging by both the speed of pipeline without the write node and the wall time of the DatastoreV1.Write compared to the wall time of the other nodes.


Approaches I've tried to solve this:

• Increasing the number of initial workers (tried 1 and 10, with no noticeable difference). Datastore decreases number of writes to 1 after some time (probably after the first 2 nodes finish processing). Based on that I suspect that DatastoreIO.v1().write() does not run its workers in parallel. Why though?

pipeline-log-workers

• Making sure everything is being run in the same location: the GCP project, dataflow pipeline workers & metadata, storage - all are set to us-central. This is suggested here

• Trying to use different entity key generation strategies (per this post ). Currently using this approach: Key.Builder keyBuilder = DatastoreHelper.makeKey("someKind", UUID.randomUUID().toString());. I'm not perfectly certain this generates keys distributed evenly enough but I guess even if it doesn't the performance should not be so low?


Please note, I was unable to use the provided Apache Beam & Google libraries without workarounds: I've had to force google-api-client version to be 1.22.0 & Guava to be 23.0 due to their dependency issues (see for example https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/607 ).

Looking at the DatastoreV1.Write node log:

datastore-log-write

It pushes batches of 500 entities every circa 5s which is not very fast.

Overall it looks like DatastoreIO.v1().write() speed is slow and its workers are not being run in parallel. Any idea how to fix this or what could be the cause?

1

1 Answers

7
votes

I should not leave this question unanswered.

After reaching out to the GCP support I've been provided a suggestion that the cause could be the TextIO.Read node reading from the compressed (gzipped) files. Apparently this is a non-parallelizable operation. Indeed, after switching to uncompressed files for the source the performance improved.

Another solution suggested was to run manual reparitioning of the pipeline after reading from the source. This means adding arbitrary keys to the items in the pipeline, grouping by the arbitrary key and then removing the arbitrary key. It works as well. This approach comes down to this code:

Pipeline code:

pipeline.apply(TextIO.read().from("foo").withCompression(Compression.GZIP)  
        .apply(ParDo.of(new PipelineRepartitioner.AddArbitraryKey<>()))
        .apply(GroupByKey.create())
        .apply(ParDo.of(new PipelineRepartitioner.RemoveArbitraryKey<>()))
        /* further transforms */ 

The helper class:

public class PipelineRepartitioner<T> {
    public static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
        }
    }

    public static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            for (T s : c.element().getValue()) {
                c.output(s);
            }
        }
    }
}

I've seen tickets related to that issue on Apache Beam Jira so this could be fixed in the future.