2
votes

Whenever a file is written to Cloud Storage, I want it to trigger a Cloud Function that executes a DataFlow template to transform the file content and write the results to BigQuery.

I think I got a handle that much for the most part. But the problem is that I don't need to just insert into a BQ table, I need to upsert (using the Merge operation). This seems like it would be a common requirement, but the Apache Beam BQ connector doesn't offer this option (only write, create and truncate/write).

So then I thought... OK, if I can just capture when the DataFlow pipeline is done executing, I could have DataFlow write to a temporary table and then I could call a SQL Merge query to merge data from the temp table to the target table. However, I'm not seeing any way to trigger a cloud function upon pipeline execution completion.

Any suggestions on how to accomplish the end goal?

Thanks

3

3 Answers

1
votes

There is no native built in solution to generate an event at the end of Dataflow job. However, you can cheat thanks to the logs.

For this:

  • Go to logs, select advanced filter (arrow on the right of the filter bar) and paste this custom filter:
resource.type="dataflow_step" textPayload="Worker pool stopped."

You should see only your end of dataflow. Then, you have to create a sink into PubSub of this result. Then, you have to plug your function on these PubSub messages and you can do what you want.

For this, after having filling up your custom filter

  • Click on create sink
  • Set a sink name
  • Set the destination to PubSub
  • Select your topic
  • Now, plug a function on this topic, it will be trigger only at the end of dataflow.
1
votes

I have implemented the exact use case, but instead of using 2 different pipeline, you can just create 1 pipeline.

Step 1: Read file from gcs and convert it into TableRow.

Step 2: Read the entire row from BigQuery.

Step 3: Create 1 pardo where you have your custom upsert operation like below code.

PCollection<KV<String,TableRow>> val = p.apply(BigQueryIO.readTableRows().from(""));

PCollection<KV<String,TableRow>> val1 = p.apply(TextIO.read().from("")).apply(Convert to TableRow()));

Step 4: Perform CoGroupByKey and perform pardo on top of that result to get the updated one(equivalent to MERGE OPERATION).

Step 5: Insert the complete TableRow to BQ using WRITE_TRUNCATE mode. Here the code part would be little bit complicate, but that would perform better using single pipeline.

1
votes

Interesting question, some good ideas already but I'd like to show another possibility with just Dataflow and BigQuery. If this is a non-templated Batch job we can use PipelineResult.waitUntilFinish() which:

Waits until the pipeline finishes and returns the final status.

Then we check if State is DONE and proceed with the MERGE statement if needed:

PipelineResult res = p.run();
res.waitUntilFinish();

if (res.getState() == PipelineResult.State.DONE) {
    LOG.info("Dataflow job is finished. Merging results...");
    MergeResults();
    LOG.info("All done :)");
}

In order to test this we can create a BigQuery table (upsert.full) which will contain the final results and be updated each run:

bq mk upsert
bq mk -t upsert.full name:STRING,total:INT64
bq query --use_legacy_sql=false "INSERT upsert.full (name, total) VALUES('tv', 10), ('laptop', 20)"

at the start we'll populate it with a total of 10 TVs. But now let's imagine that we sell 5 extra TVs and, in our Dataflow job, we'll write a single row to a temporary table (upsert.temp) with the new corrected value (15):

p
.apply("Create Data", Create.of("Start"))
.apply("Write", BigQueryIO
                .<String>write()
                .to(output)
                .withFormatFunction(
                    (String dummy) ->
                    new TableRow().set("name", "tv").set("total", 15))
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withSchema(schema));

So now we want to update the original table with the following query (DML syntax):

MERGE upsert.full F
USING upsert.temp T
ON T.name = F.name
WHEN MATCHED THEN
  UPDATE SET total = T.total
WHEN NOT MATCHED THEN
  INSERT(name, total)
  VALUES(name, total)

Therefore, we can use BigQuery's Java Client Library in MergeResults:

BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
    QueryJobConfiguration.newBuilder(
          "MERGE upsert.full F "
        + ...
        + "VALUES(name, total)")
        .setUseLegacySql(false)
        .build();

JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

This is based on this snippet which includes some basic error handling. Note that you'll need to add this to your pom.xml or equivalent:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-bigquery</artifactId>
  <version>1.82.0</version>
</dependency>

and it works for me:

INFO: 2020-02-08T11:38:56.292Z: Worker pool stopped.
Feb 08, 2020 12:39:04 PM org.apache.beam.runners.dataflow.DataflowPipelineJob logTerminalState
INFO: Job 2020-02-08_REDACTED finished with status DONE.
Feb 08, 2020 12:39:04 PM org.apache.beam.examples.BigQueryUpsert main
INFO: Dataflow job is finished. Merging results...
Feb 08, 2020 12:39:09 PM org.apache.beam.examples.BigQueryUpsert main
INFO: All done :)
$ bq query --use_legacy_sql=false "SELECT name,total FROM upsert.full LIMIT 10"
+--------+-------+
|  name  | total |
+--------+-------+
| tv     |    15 |
| laptop |    20 |
+--------+-------+

Tested with the 2.17.0 Java SDK and both the Direct and Dataflow runners.

Full example here