1
votes

Below is the code that I'm using to write data to BigQuery

WriteResult result = formattedData.get(successRows).setCoder(TableRowJsonCoder.of())
            .apply("BQ SteamingInserts",BigQueryIO.writeTableRows()
                    .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                    .withFormatFunction(new TableRowFormatFn())
                    .to(new DestinationMapper())
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .withoutValidation()
                    .withExtendedErrorInfo());

The code is handling all the schema related issues but when table does not exist in BigQuery, it keeps retrying the inserts indefinitely resulting in pipeline getting stalled.

Below is the error obtained in Dataflow

java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Not found: Table analytics-and-presentation:trusted_layer_ods.wrong_table",
    "reason" : "notFound"
  } ],
  "message" : "Not found: Table analytics-and-presentation:trusted_layer_ods.wrong_table",
  "status" : "NOT_FOUND"
}

Can someone help?

1

1 Answers

2
votes

This looks like is an expected behavior as Dataflow will reentry indefinitely in the case of a streaming pipeline. The job would have failed after 4 tries in a batch pipeline.

You must clearly define in your code what you expect to achieve. You can get inspiration from the examples found on the official Google Cloud Platform github page.

With your present code you must make sure you create the table in advance so to avoid this error.