I am new to Pipeline world and Google API DataFlow.
I want to read data from BigQuery with sqlQuery. When I read all database it's working OK.
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
BigQueryIO.Read
.named("Read")
.from("test:DataSetTest.data"));
But when I use fromQuery I got error.
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<TableRow> qData = p.apply(
BigQueryIO.Read
.named("Read")
.fromQuery("SELECT * FROM DataSetTest.data"));
Error:
Exception in thread "main" java.lang.IllegalArgumentException: Validation of query "SELECT * FROM DataSetTest.data" failed. If the query depends on an earlier stage of the pipeline, This validation can be disabled using #withoutValidation.
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:449)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.validate(BigQueryIO.java:432)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:357)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.values.PBegin.apply(PBegin.java:47)
at com.google.cloud.dataflow.sdk.Pipeline.apply(Pipeline.java:151)
at Test.java.packageid.StarterPipeline.main(StarterPipeline.java:72)
Caused by: java.lang.NullPointerException: Required parameter projectId must be specified.
at com.google.api.client.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229)
at com.google.api.client.util.Preconditions.checkNotNull(Preconditions.java:140)
at com.google.api.services.bigquery.Bigquery$Jobs$Query.(Bigquery.java:1751)
at com.google.api.services.bigquery.Bigquery$Jobs.query(Bigquery.java:1724)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Read$Bound.dryRunQuery(BigQueryIO.java:445)
... 6 more
What is the problem here?
UPDATE:
I set project by "options.setProject".
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
options.setProject("test");
PCollection<TableRow> qData = p.apply(
BigQueryIO.Read
.named("Read")
.fromQuery("SELECT * FROM DataSetTest.data"));
But now I got this message. Table is not found.
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found { "code" : 404, "errors" : [ { "domain" : "global", "message" : "Not found: Table test:_dataflow_temporary_dataset_737099.dataflow_temporary_table_550832", "reason" : "notFound" } ], "message" : "Not found: Table test:_dataflow_temporary_dataset_737099.dataflow_temporary_table_550832" }