0
votes

I have a Dataflow pipeline, running locally. The objective is to read a JSON file using TEXTIO, make sessions and load it into BigQuery. Given the structure I have to create a temp directory in GCS and then load it into BigQuery using that. Previously I had a data schema error that prevented me to load the data, see here. That issue is resolved.

So now when I run the pipeline locally it ends with dumping a temporary JSON newline delimited file into GCS. The SDK then gives me the following:

Starting BigQuery load job beam_job_xxxx_00001-1: try 1/3
INFO [main] (BigQueryIO.java:2191) - BigQuery load job failed: beam_job_xxxx_00001-1
...
Exception in thread "main" com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:187)
at pedesys.Dataflow.main(Dataflow.java:148)
Caused by: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.load(BigQueryIO.java:2198)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.processElement(BigQueryIO.java:2146)

The errors are not very descriptive and the data is still not loaded in BigQuery. What is puzzling is that if I go to the BigQuery UI and load the same temporary file from GCS that was dumped by the SDK's Dataflow pipeline manually, in the same table, it works beautifully.

The relevant code parts are as follows:

PipelineOptions options = PipelineOptionsFactory.create();
    options.as(BigQueryOptions.class)
            .setTempLocation("gs://test/temp");
    Pipeline p = Pipeline.create(options)
...

...
session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
      .apply(BigQueryIO.Write
      .named("loadJob")
      .to("myproject:db.table")
      .withSchema(schema)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
      );
1
The first thing to do is find out what the error actually is. Take the BigQuery load job id (job beam_job_<xxxx>_00001-1) and either get the details from the command line (bq show -j job beam_job_<xxxx>_00001-1), or the via the your browser by using "try it" at the bottom of the page (cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get). Then you'll know more details.Graham Polley
I filed issues.apache.org/jira/browse/BEAM-1235 to improve the error reporting.jkff
Thank you @jkff that would be very helpful, and would have prevented this all in the first place :)maininformer
@GrahamPolley Thank you very much for this information. I debugged it form, however two things learned. Since there is a "job history" section in the BigQuery UI, I thought that is an exhaustive list and since my SDK jobs were not listed there I did not look for another job getter, it will be nice to also have the job getter linked to to the UI job history.maininformer
@plumSemPy - Terrific news! OK, I'll add an answer now so that others know what to do if they hit the same problem.Graham Polley

1 Answers

3
votes

The SDK is swallowing the error/exception and not reporting it to users. It's most likely a schema problem. To get the actual error that is happening you need to fetch the job details by either:

  1. CLI - bq show -j job beam_job_<xxxx>_00001-1
  2. Browser/Web: use "try it" at the bottom of the page here.

@jkff has raised an issue here to improve the error reporting.