3
votes

I have a Google App Engine application where data is stored in the Google Cloud Datastore. I want to use Dataflow to put part of that data into BigQuery, but I thought I'd start with just getting some information from the Datastore and writing it to Google Cloud Storage. My code looks like this:

public class DatastorePipeline {
    private static final Logger LOG = LoggerFactory.getLogger(DatastorePipeline.class);

static class GetEmailFn extends DoFn<Entity, String> {

    @Override
    public void processElement(ProcessContext c) throws Exception {
        Map<String, Value> properties = DatastoreHelper.getPropertyMap(c.element());
        Value value = properties.get("email_address");
        if(value != null) {
            c.output(DatastoreHelper.getString(value));
        }
    }
}

    public static void main(String[] args) {
        Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

        Query.Builder q = Query.newBuilder();
        q.addKindBuilder().setName("User");
        Query query = q.build();

        DatastoreIO.Source source = DatastoreIO.source()
        .withDataset("my-project-id")
        .withQuery(query);

        p.apply("ReadUsersFromDatastore", Read.from(source))
        .apply(ParDo.named("GetEmailAddress").of(new GetEmailFn()))
        .apply(TextIO.Write.to("gs://dataflow-output-bucket/emails.txt"));

        p.run();
    }
}

However, when I try to run this, I get 403 errors on making the Datastore query:

Request failed with code 403, will NOT retry: https://www.googleapis.com/datastore/v1beta2/datasets/my-project-id/runQuery

I'm running this from Eclipse with the Google Cloud Dataflow plugin. Running dataflow jobs which don't have Datastore reads in them work just fine. I did a

gcloud auth login

before running the job, as described in the tutorial. What am I doing wrong?

edit: Here is the full stacktrace.

Oct 11, 2015, 12:03:13 PM (b6119cca307b4d9a): com.google.api.services.datastore.client.DatastoreException: Unauthorized. at com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81) at com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41) at com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109) at com.google.api.services.datastore.client.QuerySplitterImpl.getScatterKeys(QuerySplitterImpl.java:189) at com.google.api.services.datastore.client.QuerySplitterImpl.getSplits(QuerySplitterImpl.java:75) at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.getSplitQueries(DatastoreIO.java:427) at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.splitIntoBundles(DatastoreIO.java:306) at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.performSplit(BasicSerializableSourceFormat.java:318) at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.performSourceOperation(BasicSerializableSourceFormat.java:167) at com.google.cloud.dataflow.sdk.runners.worker.SourceOperationExecutor.execute(SourceOperationExecutor.java:80) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:257) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:146) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:164) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:145) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.google.api.client.http.HttpResponseException: 403 Forbidden Unauthorized. at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1061) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78) ... 19 more

Answered: It turned out that the problem was that my project was restricting access based on my company's domain, which was preventing the service account from connecting. Thanks Dan for helping me work through it!

1
Sorry you're having this trouble. Can you edit the question to include the full stacktrace of the error? - jkff
It looks like you're running with the DataflowPipelineRunner (or the Blocking variant. Does the same error occur with the DirectPipelineRunner? - Dan Halperin
Also, this error happens every time, not occasionally, correct? - Dan Halperin
This error happens every time, and only on the DataflowPipeLineRunner/Blocking runner. It does not occur on the DirectPipelineRunner. - Herbert Lee

1 Answers

2
votes

It looks like the permissions for your Datastore are not configured correctly.

Here are two generic pieces of advice:

  1. It is useful to review the Google Cloud Dataflow Security and Permissions document.
  2. Was the Datastore created in the same project as you're running the jobs?

In your case, however, you are hitting the following bug:

  1. Is the associated AppEngine project locked down to all users of a specific domain? If so, there is an issue in the current beta of Cloud Datastore that prevents the Dataflow service account (email ending in, e.g., @cloudservices.gserviceaccount.com) from accessing the data.

    There is a temporary workaround we can apply, at a minor cost if you are using the OAuth API. The workaround would no longer enforce that the user comes from your app's domain. If that's an important requirement for you, you can do the domain enforcement in your code. (The regular Users API is not affected.)

    To request that we apply the temporary workaround, you can email us at [email protected] referencing this issue and including your numeric project ID.