I have a Google App Engine application where data is stored in the Google Cloud Datastore. I want to use Dataflow to put part of that data into BigQuery, but I thought I'd start with just getting some information from the Datastore and writing it to Google Cloud Storage. My code looks like this:
public class DatastorePipeline {
private static final Logger LOG = LoggerFactory.getLogger(DatastorePipeline.class);
static class GetEmailFn extends DoFn<Entity, String> {
@Override
public void processElement(ProcessContext c) throws Exception {
Map<String, Value> properties = DatastoreHelper.getPropertyMap(c.element());
Value value = properties.get("email_address");
if(value != null) {
c.output(DatastoreHelper.getString(value));
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
Query.Builder q = Query.newBuilder();
q.addKindBuilder().setName("User");
Query query = q.build();
DatastoreIO.Source source = DatastoreIO.source()
.withDataset("my-project-id")
.withQuery(query);
p.apply("ReadUsersFromDatastore", Read.from(source))
.apply(ParDo.named("GetEmailAddress").of(new GetEmailFn()))
.apply(TextIO.Write.to("gs://dataflow-output-bucket/emails.txt"));
p.run();
}
}
However, when I try to run this, I get 403 errors on making the Datastore query:
Request failed with code 403, will NOT retry: https://www.googleapis.com/datastore/v1beta2/datasets/my-project-id/runQuery
I'm running this from Eclipse with the Google Cloud Dataflow plugin. Running dataflow jobs which don't have Datastore reads in them work just fine. I did a
gcloud auth login
before running the job, as described in the tutorial. What am I doing wrong?
edit: Here is the full stacktrace.
Oct 11, 2015, 12:03:13 PM (b6119cca307b4d9a): com.google.api.services.datastore.client.DatastoreException: Unauthorized. at com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81) at com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41) at com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109) at com.google.api.services.datastore.client.QuerySplitterImpl.getScatterKeys(QuerySplitterImpl.java:189) at com.google.api.services.datastore.client.QuerySplitterImpl.getSplits(QuerySplitterImpl.java:75) at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.getSplitQueries(DatastoreIO.java:427) at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.splitIntoBundles(DatastoreIO.java:306) at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.performSplit(BasicSerializableSourceFormat.java:318) at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.performSourceOperation(BasicSerializableSourceFormat.java:167) at com.google.cloud.dataflow.sdk.runners.worker.SourceOperationExecutor.execute(SourceOperationExecutor.java:80) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:257) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:146) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:164) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:145) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.google.api.client.http.HttpResponseException: 403 Forbidden Unauthorized. at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1061) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78) ... 19 more
Answered: It turned out that the problem was that my project was restricting access based on my company's domain, which was preventing the service account from connecting. Thanks Dan for helping me work through it!
DataflowPipelineRunner(or theBlockingvariant. Does the same error occur with theDirectPipelineRunner? - Dan Halperin