0
votes

I'm new to GC Dataflow and didn't find a relevant answer here. Apologies if I should have found this already answered.

I'm trying to create a simple pipeline using the v2.0 SDK and am having trouble reading data into my PCollection using BigQueryIO. I am using the .withQuery method and I have tested the query in the BigQuery interface and it seems to be working fine. The initial PCollection seems to get created without any issues, but when I think setup a simple ParDo function to convert the values from the TableRow into a PCollection I am getting a NullPointerException on the line of code that does the .get on the TableRow object.

Here is my code. (I'm probably missing something simple. I'm a total newbie at Pipeline programming. Any input would be most appreciated.)

public class ClientAutocompletePipeline {
    private static final Logger LOG = LoggerFactory.getLogger(ClientAutocompletePipeline.class);


    public static void main(String[] args) {
        //  create the pipeline  
        Pipeline p = Pipeline.create(
                PipelineOptionsFactory.fromArgs(args).withValidation().create());

        // A step to read in the product names from a BigQuery table
        p.apply(BigQueryIO.read().fromQuery("SELECT name FROM [beaming-team-169321:Products.raw_product_data]"))

        .apply("ExtractProductNames", ParDo.of(new DoFn<TableRow, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                // Grab a row from the BigQuery Results
                TableRow row = c.element();

                // Get the value of the "name" column from the table row.
                //NOTE: This is the line that is giving me the NullPointerException 
                String productName = row.get("name").toString();

                // Make sure it isn't empty
                if (!productName.isEmpty()) {
                    c.output(productName);
                }
            }
        }))

The query definitely works in the BigQuery UI and the column called "name" is returned when I test the query. Why am I getting a NullPointerException on this line:

String productName = row.get("name").toString();

Any ideas?

3
Are all of the values in the name column guaranteed not to be null?Elliott Brossard
If you run SELECT name FROM [beaming-team-169321:Products.raw_product_data] where name is null in BigQuery you'll see that there are null values. So, you need to take this into consideration in your pipeline.Graham Polley
Well, now that you mention it, that does make perfect sense. I guess I was under the incorrect impression that something was causing them all to be null because I had some error in my code, but I guess that might not have been the case. Thanks for replying!MrSimmonsSr

3 Answers

0
votes

This is a common problem when working with BigQuery and Dataflow (most likely the field is indeed null). If you are ok with using Scala, you could take a look at Scio (which is a Scala DSL for Dataflow) and its BigQuery IO.

0
votes

Just make your code null safe. Replace this:

String productName = row.get("name").toString();

With something like this:

String productName = String.valueOf(row.get("name"));

0
votes

I think I'm late for this but you can do something like if(row.containsKey("column-name")). This will basically tell you if the field is null or not. In BigQuery what happens is, while reading data, if a column value is null, it is not available as a part of that particular TableRow. Hence, you are getting that error. You can also do something like if(null == row.get("column-name")) to check if the field is null or not.