1
votes

I am trying to write to a BigQuery table using Cloud Dataflow. This BigQuery table has an integer column which is set to nullable. For null values, it gives following error:

Could not convert value to integer. Field: ITM_QT; Value:

But when I converted the datatype of the same column to String, it is accepting null values.

So is there any way to write null values to an integer column using Cloud Dataflow?

This error goes if I change the column datatype to String.

1
This problem exists even for null timestamp values. SO i am facing this problem for integer, float and timestamp data types.abhishek jha
are you sure you are setting null (actual null), and not "null" as in a String value?Graham Polley

1 Answers

2
votes

Not sure what you are doing wrong, but the following code works fine, and does indeed allow writing null values for Integer & Float datatypes in BigQuery:

public static void main(String[] args) {
        DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
        options.setRunner(DirectPipelineRunner.class);
        options.setProject("<project-id>");

        Pipeline pipeline = Pipeline.create(options);

        PCollection<TableRow> results = pipeline.apply("whatever", BigQueryIO.Read.from("<table-spec>")).apply(ParDo.of(new DoFn<TableRow, TableRow>() {
            @Override
            public void processElement(ProcessContext c) throws Exception {
                System.out.println(c.element());
                TableRow row = new TableRow();
                row.set("foo", null); //null FLOAT
                row.set("bar", null); //null INTEGER
                c.output(row);
            }
        }));

        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("foo").setType("FLOAT"));
        fields.add(new TableFieldSchema().setName("bar").setType("INTEGER"));
        TableSchema schema = new TableSchema().setFields(fields);

        results.apply(BigQueryIO.Write
                .named("Write")
                .to("<project-id>:<dataset-name>.write_null_numbers_test")
                .withSchema(schema)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

        pipeline.run();
    }

enter image description here

enter image description here