2
votes

I've succesfully executed the following pipeline/job on the Dataflow service for some kinds. However, one of my kinds consistently fails with the subsequent error. Any ideas?

import com.google.api.services.datastore.DatastoreV1;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.DatastoreIO;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.values.PCollection;


public class DatastoreExperiment {
    public static void main(String[] args) {
        String dataset = "mailfoogae";
        String kind = "TrackedThread";


        Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

        DatastoreV1.KindExpression entityKind = DatastoreV1.KindExpression.newBuilder().setName(kind).build();
        DatastoreV1.Query entityQuery = DatastoreV1.Query.newBuilder().addKind(entityKind).build();


        PCollection<DatastoreV1.Entity> entities = p.apply(DatastoreIO.readFrom(dataset, entityQuery));
        entities.apply(ParDo.of(new DoFn<DatastoreV1.Entity, Integer>() {
            @Override
            public void processElement(ProcessContext c) throws Exception {
                c.output(1);
            }
        }))
        .apply(Combine.globally(new Sum.SumIntegerFn()))
        .apply(ParDo.of(new DoFn<Integer, String>() {
            @Override
            public void processElement(ProcessContext c) throws Exception {
                c.output(c.element().toString());
            }
        }))
        .apply(TextIO.Write.named("WriteMyFile-" + kind).to("gs://temp.streak.com/outputData-" + kind + ".txt"));

        p.run();

    }
}

and the following error is produced:

Apr 17, 2015, 2:45:59 PM (9c3980cbff15b512): java.io.IOException: Failed to start reading from source: Datastore: host https://www.googleapis.com; dataset mailfoogae; query: kind { name: "TrackedLink" } filter { composite_filter { operator: AND filter { property_filter { property { name: "__key__" } operator: GREATER_THAN_OR_EQUAL value { key_value { partition_id { dataset_id: "s~mailfoogae" } path_element { kind: "TrackedLink" name: "5f8e8209-617f-44d0-95df-36dcdea79151" } } } } } filter { property_filter { property { name: "__key__" } operator: LESS_THAN value { key_value { partition_id { dataset_id: "s~mailfoogae" } path_element { kind: "TrackedLink" name: "7d1e71de-9a37-4063-9569-33c0dad538fe" } } } } } } } at
com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.hasNext(BasicSerializableSourceFormat.java:321) at 
com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:174) at 
com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:121) at 
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:130) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:95) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:139) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:124) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at 
java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: Backend Error at 
com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:721) at 
com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.start(DatastoreIO.java:712) at 
com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.hasNext(BasicSerializableSourceFormat.java:313) ... 11 more 
Caused by: com.google.api.services.datastore.client.DatastoreException: Backend Error at 
com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81) at 
com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41) at 
com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109) at
com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.getIteratorAndMoveCursor(DatastoreIO.java:771) at 
    com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:719) ... 13 more 
    Caused by: com.google.api.client.http.HttpResponseException: 503 Service Unavailable Backend Error at 
    com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1054) at 
    com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78) ... 17 more
1
So, just to clarify: this same pipeline works fine with other kinds, but gives this error with this particular kind? 1) Does it work if you run using DirectPipelineRunner? (i.e. locally rather than on the service) 2) Are you able to access this kind's data without Dataflow, directly in the Datastore browser? - jkff
1) Yes but incredibly slow - aloo
So, seems that the problem is that Dataflow's Datastore reader specifies too high of a query limit, which is triggering errors in the service. Can you try running your pipeline with a locally-built Dataflow SDK with QUERY_LIMIT in DatastoreIO.DatastoreReader (github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/…) changed from 5000 to 500? Meanwhile I'll try to get this change into the next SDK release. Datastore team also says they're making improvements to make this sort of tuning unnecessary, but for now it is. - jkff
Sure - I'm suggesting you to run on the Dataflow service in exactly the same way you did before, but use a modified SDK. For that, you can check out the SDK from github, make this fix, and use maven to rebuild the artifact and deploy it locally. Then make your program depend on this locally-built artifact version and rerun. - jkff
The fix has been pushed to GitHub github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/… - however Maven artifact pushes are currently less frequent, currently on the order of once every few weeks. - jkff

1 Answers

1
votes

This was an issue with Dataflow's Datastore reader specifying too high of a query limit, which is triggering errors in the Datastore service. The SDK has been updated with a fix: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/c0d5d58b0e017fdcc1406efed78dbbf93c535265