Interesting question, some good ideas already but I'd like to show another possibility with just Dataflow and BigQuery. If this is a non-templated Batch job we can use PipelineResult.waitUntilFinish()
which:
Waits until the pipeline finishes and returns the final status.
Then we check if State
is DONE
and proceed with the MERGE
statement if needed:
PipelineResult res = p.run();
res.waitUntilFinish();
if (res.getState() == PipelineResult.State.DONE) {
LOG.info("Dataflow job is finished. Merging results...");
MergeResults();
LOG.info("All done :)");
}
In order to test this we can create a BigQuery table (upsert.full
) which will contain the final results and be updated each run:
bq mk upsert
bq mk -t upsert.full name:STRING,total:INT64
bq query --use_legacy_sql=false "INSERT upsert.full (name, total) VALUES('tv', 10), ('laptop', 20)"
at the start we'll populate it with a total
of 10 TVs. But now let's imagine that we sell 5 extra TVs and, in our Dataflow job, we'll write a single row to a temporary table (upsert.temp
) with the new corrected value (15):
p
.apply("Create Data", Create.of("Start"))
.apply("Write", BigQueryIO
.<String>write()
.to(output)
.withFormatFunction(
(String dummy) ->
new TableRow().set("name", "tv").set("total", 15))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withSchema(schema));
So now we want to update the original table with the following query (DML syntax):
MERGE upsert.full F
USING upsert.temp T
ON T.name = F.name
WHEN MATCHED THEN
UPDATE SET total = T.total
WHEN NOT MATCHED THEN
INSERT(name, total)
VALUES(name, total)
Therefore, we can use BigQuery's Java Client Library in MergeResults
:
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"MERGE upsert.full F "
+ ...
+ "VALUES(name, total)")
.setUseLegacySql(false)
.build();
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
This is based on this snippet which includes some basic error handling. Note that you'll need to add this to your pom.xml
or equivalent:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>1.82.0</version>
</dependency>
and it works for me:
INFO: 2020-02-08T11:38:56.292Z: Worker pool stopped.
Feb 08, 2020 12:39:04 PM org.apache.beam.runners.dataflow.DataflowPipelineJob logTerminalState
INFO: Job 2020-02-08_REDACTED finished with status DONE.
Feb 08, 2020 12:39:04 PM org.apache.beam.examples.BigQueryUpsert main
INFO: Dataflow job is finished. Merging results...
Feb 08, 2020 12:39:09 PM org.apache.beam.examples.BigQueryUpsert main
INFO: All done :)
$ bq query --use_legacy_sql=false "SELECT name,total FROM upsert.full LIMIT 10"
+--------+-------+
| name | total |
+--------+-------+
| tv | 15 |
| laptop | 20 |
+--------+-------+
Tested with the 2.17.0 Java SDK and both the Direct and Dataflow runners.
Full example here