2
votes

I am running a job on google dataflow written with apache beam that reads from BigQuery table and from files. Transforms the data and writes it into other BigQuery tables. The job "usually" succeeds, but sometimes I am randomly getting nullpointer exception when reading from big query table and my job fails:

(288abb7678892196): java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.split(BigQuerySourceBase.java:98)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.splitAndValidate(WorkerCustomSources.java:261)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplitTyped(WorkerCustomSources.java:209)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplitWithApiLimit(WorkerCustomSources.java:184)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplit(WorkerCustomSources.java:161)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSourceOperationExecutor.execute(WorkerCustomSourceOperationExecutor.java:47)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:341)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:297)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I cannot figure out what is this connected to. When I clear the temp directory and reupload my template the job passes again.

The way I read from BQ is simply with:

BigQueryIO.read().fromQuery()

I would greatly appreciate any help.

Anyone?

3
Are you adding an actual query to your source? Or are you just calling fromQuery() without any parameters? Also, Read is not a function, but an internal class..Pablo
Are you running a pipeline directly or using the runner to create a template and then running that?Ben Chambers
Why don't you just run it locally and debug it!?Graham Polley
Are you doing any pardo transform on the data read from biquery? Incase yes then please provide code snippet where exactly you are getting nullpointer exception?Manoj Kumar

3 Answers

3
votes

I ended up adding bug in google issuetracker. After longer conversation with google employee and their investigation it turned out that it doesn't make sense to use templates with dataflow batch jobs that read from BigQuery, because you can only execute them once.

To quote: "for BigQuery batch pipelines, templates can only be executed once, as the BigQuery job ID is set at template creation time. This restriction will be removed in a future release for the SDK 2, but when I cannot say. Creating Templates: https://cloud.google.com/dataflow/docs/templates/creating-templates#pipeline-io-and-runtime-parameters"

It still would be good if the error would be more clear than NullpointerException.

Anyway I hope that helps someone in the future.

Here is the issue if someone is interested in whole conversation: https://issuetracker.google.com/issues/63124894

2
votes

I ran into this issue as well, and after digging around, it turns out that the restriction has been removed in version 2.2.0. However, it has not been officially released yet. You can view the progress of this version on their JIRA project (it seems that there's only one issue left).

But if you want to use it now, you can to compile it yourself, which isn't difficult. Just checkout the source code from their github mirror, checkout tag v2.2.0-RC4, and then run mvn clean install. Then just modify your project dependencies in pom.xml to point to version 2.2.0 instead.

From 2.2.0 onwards, if you want to use BigQueryIO for template, you will need to call withTemplateCompatibility():

BigQueryIO
    .readTableRows() // read() has been deprecated in 2.2.0
    .withTemplateCompatibility() // You need to add this
    .fromQuery(options.getInputQuery())

I'm currently using 2.2.0 for my project, and it works fine so far.

1
votes

Ok let me give a bit more details.

  • Job is uploaded as template and run on google dataflow
  • Job usually succeeds - that's why I doubt there is something wrong with the actual code. Exception is coming from the source, it looks like: bqServices.getDatasetService(bqOptions) returns null in BigQuerySourceBase
  • Yes I do provide the actual query

Below is the DAG of my job. As you can see this run succeeded. It processed more than 2 mln rows that were exported from BQ, 1.5 mln rows from csv files and wrote 800k back to BigQuery (the numbers are correct). The job basically works as expected (when it works). Top left (read transactions) is the step that does query on BQ. And that step fails sometimes without a reason.

Successful run - Beam DAG

Below is the same job when it failed with Nullpointer on BQ source.

Failed run - Beam DAG

I'm not sure how helpful code snippet will be in this case but this is the part of doing the query:

PCollection<Transaction> transactions = p.apply("Read Transactions", BigQueryIO.read().fromQuery(createTransactionQuery(options)))
                                        .apply("Map to Transaction", MapElements.via(new TableRowToTransactionFn()));

    PCollection<KV<String, Transaction>> transactionsPerMtn = 
            transactions.apply("Filter Transactions Without MTN", Filter.by(t -> t.transactionMtn != null))
                        .apply("Map Transactions to MTN key", MapElements.into(
                    TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(Transaction.class)))
                                    .via(t -> KV.of(t.transactionMtn, t)));

And below the method to get the query:

private ValueProvider<String> createTransactionQuery(TmsPipelineOptions options) {
    return NestedValueProvider.of(options.getInputTransactionTable(), table -> {
        StringBuilder sb = new StringBuilder();
        sb.append(
                "SELECT transaction_id, transaction_mtn, transaction_folio_number, transaction_payer_folio_number FROM ");
        sb.append(table);
        return sb.toString();
    });
}

I believe there is some kind of bug in big query source, that leads to problems like that. I just cannot nail down what is causing that, since it is happening randomly. Like I wrote, last time I encountered it, I just cleared temp dir on gcs and re-uploaded my template (wihtout any code changes) and the job started working again.