0
votes

I have a small dataflow job triggered from a cloud function using a dataflow template. The job basically reads from a table in Bigquery, converts the resultant Tablerow to a Key-Value, and writes the Key-Value to Datastore.

This is what my code looks like :-

PCollection<TableRow> bigqueryResult = p.apply("BigQueryRead",
                BigQueryIO.readTableRows().withTemplateCompatibility()
                        .fromQuery(options.getQuery()).usingStandardSql()
                        .withoutValidation());

bigqueryResult.apply("WriteFromBigqueryToDatastore", ParDo.of(new DoFn<TableRow, String>() {                
            @ProcessElement
            public void processElement(ProcessContext pc) {
                TableRow row = pc.element();

                Datastore datastore = DatastoreOptions.getDefaultInstance().getService();
                KeyFactory keyFactoryCounts = datastore.newKeyFactory().setNamespace("MyNamespace")
                        .setKind("MyKind");

                Key key = keyFactoryCounts.newKey("Key");
                Builder builder =   Entity.newBuilder(key);
                builder.set("Key", BooleanValue.newBuilder("Value").setExcludeFromIndexes(true).build());   

                Entity entity= builder.build();
                datastore.put(entity);
            }
        }));

This pipeline runs fine when the number of records I try to process is anywhere in the range of 1 to 100. However, when I try putting more load on the pipeline, ie, ~10000 records, the pipeline does not scale (eventhough autoscaling is set to THROUGHPUT based and maximumWorkers is specified to as high as 50 with an n1-standard-1 machine type). The job keeps processing 3 or 4 elements per second with one or two workers. This is impacting the performance of my system.

Any advice on how to scale up the performance is very welcome. Thanks in advance.

2

2 Answers

1
votes

Found a solution by using DatastoreIO instead of the datastore client. Following is the snippet I used,

    PCollection<TableRow> row = p.apply("BigQueryRead",
                BigQueryIO.readTableRows().withTemplateCompatibility()
                        .fromQuery(options.getQueryForSegmentedUsers()).usingStandardSql()
                        .withoutValidation());          

    PCollection<com.google.datastore.v1.Entity> userEntity = row.apply("ConvertTablerowToEntity", ParDo.of(new DoFn<TableRow, com.google.datastore.v1.Entity>() {

        @SuppressWarnings("deprecation")
        @ProcessElement
        public void processElement(ProcessContext pc) {
            final String namespace = "MyNamespace";
            final String kind = "MyKind";

            com.google.datastore.v1.Key.Builder keyBuilder = DatastoreHelper.makeKey(kind, "root");
            if (namespace != null) {
              keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
            }
            final com.google.datastore.v1.Key ancestorKey = keyBuilder.build();

            TableRow row = pc.element();
            String entityProperty = "sample";

            String key = "key";

            com.google.datastore.v1.Entity.Builder entityBuilder = com.google.datastore.v1.Entity.newBuilder();
            com.google.datastore.v1.Key.Builder keyBuilder1 = DatastoreHelper.makeKey(ancestorKey, kind, key);
            if (namespace != null) {
                keyBuilder1.getPartitionIdBuilder().setNamespaceId(namespace);
              }

              entityBuilder.setKey(keyBuilder1.build());
              entityBuilder.getMutableProperties().put(entityProperty, DatastoreHelper.makeValue("sampleValue").build());
              pc.output(entityBuilder.build());             
        }

    }));

    userEntity.apply("WriteToDatastore", DatastoreIO.v1().write().withProjectId(options.getProject()));

This solution was able to scale from 3 elements per second with 1 worker to ~1500 elements per second with 20 workers.

0
votes

At least with python's ndb client library it's possible to write up to 500 entities at a time in a single .put_multi() datastore call - a whole lot faster than calling .put() for one entity at a time (the calls are blocking on the underlying RPCs)

I'm not a java user, but a similar technique appears to be available for it as well. From Using batch operations:

You can use the batch operations if you want to operate on multiple entities in a single Cloud Datastore call.

Here is an example of a batch call:

Entity employee1 = new Entity("Employee");
Entity employee2 = new Entity("Employee");
Entity employee3 = new Entity("Employee");
// ...

List<Entity> employees = Arrays.asList(employee1, employee2, employee3);
datastore.put(employees);