1
votes

I am new to Pipeline world and Google API DataFlow.

I want to read data from BigQuery with sqlQuery. When I read all database it's working OK.

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
     BigQueryIO.Read
         .named("Read")
         .from("test:DataSetTest.data"));

But when I use fromQuery I got error.

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
     BigQueryIO.Read
         .named("Read")
         .fromQuery("SELECT * FROM DataSetTest.data"));

Error:

Exception in thread "main" java.lang.IllegalArgumentException: Validation of query "SELECT * FROM DataSetTest.data" failed. If the query depends on an earlier stage of the pipeline, This validation can be disabled using #withoutValidation.

at com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:449)

at com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.validate(BigQueryIO.java:432)

at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:357)

at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)

at com.google.cloud.dataflow.sdk.values.PBegin.apply(PBegin.java:47)

at com.google.cloud.dataflow.sdk.Pipeline.apply(Pipeline.java:151)

at Test.java.packageid.StarterPipeline.main(StarterPipeline.java:72)

Caused by: java.lang.NullPointerException: Required parameter projectId must be specified.

at com.google.api.client.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229)

at com.google.api.client.util.Preconditions.checkNotNull(Preconditions.java:140)

at com.google.api.services.bigquery.Bigquery$Jobs$Query.(Bigquery.java:1751)

at com.google.api.services.bigquery.Bigquery$Jobs.query(Bigquery.java:1724)

at com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:445)

... 6 more

What is the problem here?

UPDATE:

I set project by "options.setProject".

PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline p = Pipeline.create(options);
    options.setProject("test");
    PCollection<TableRow> qData = p.apply(
         BigQueryIO.Read
             .named("Read")
             .fromQuery("SELECT * FROM DataSetTest.data"));

But now I got this message. Table is not found.

Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found { "code" : 404, "errors" : [ { "domain" : "global", "message" : "Not found: Table test:_dataflow_temporary_dataset_737099.dataflow_temporary_table_550832", "reason" : "notFound" } ], "message" : "Not found: Table test:_dataflow_temporary_dataset_737099.dataflow_temporary_table_550832" }

1

1 Answers

4
votes

All resources in Google Cloud Platform, including BigQuery tables and Dataflow jobs, are associated with a cloud project. Specifying the project is necessary when interacting with GCP resources.

The exception trace is saying that no cloud project is set for the BigQueryIO.Read transform: Caused by: java.lang.NullPointerException: Required parameter projectId must be specified.

Dataflow controls the default value of the cloud project via its PipelineOptions API. Dataflow will default to using the project across its APIs, including BigQueryIO.

Normally, we recommend constructing the PipelineOptions from command line arguments using PipelineOptionsFactory.fromArgs(String) API. In this case, you'd just pass --project=YOUR_PROJECT on the command line.

Alternatively, this can be set manually in the code, as follows:

GcpOptions gcpOptions = options.as(GcpOptions.class);
options.setProject("YOUR_PROJECT");

Finally, starting with the version 1.4.0 of the Dataflow SDK for Java, Dataflow will default to using the cloud project set via gcloud config set project <project>. You can still override it via PipelineOptions, but don't need to. This may have worked in some scenarios even before version 1.4.0, but may not have been reliable in all scenarios or combinations of versions of Cloud SDK and Dataflow SDK.