0
votes

Hi,

I'm using Beam to read from a BQ table and saw that read() using a SerializableFunction has better performance than readTableRows(). Following the example at https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html#read-org.apache.beam.sdk.transforms.SerializableFunction-

My Big Query columns are:

|Field name | Field type|
|Date_Time  | TIMESTAMP |
|Simple_Id  | STRING    |
|A_Price    | NUMERIC   |

My code looks like:

public class ConvertBQSchemaRecordToProtoDataFn implements SerializableFunction {

@Override
public ProtoValueType apply(SchemaAndRecord schemaAndRecord) {
    GenericRecord avroRecord = schemaAndRecord.getRecord();

    long dateTimeMillis  = (Long) avroRecord.get("Date_Time");
    String simpleId  = avroRecord.get("Simple_Id").toString();
    double aPrice  = convertToDouble(avroRecord.get("A_Price").toString());

The long and String are fine. However, when I try to convert the NUMERIC type, the GenericRecord (from the debugger) shows it as a HeapByteBuffer which you can't cast. I'm not sure how to get the value of "A_Price":

debug

The calling pipeline code looks like:

PCollection<ProtoValueType> protoData =
        pipeline.apply("BigQuery Read",
                       BigQueryIO.read(new ConvertBQSchemaRecordToProtoDataFn())
                               .fromQuery(sqlQuery)
                                .usingStandardSql()
                       .withCoder(ProtoCoder.of(ProtoValueType.class)));

I'm not sure if the Coder is used or not. ProtoValueType is a protobuf generated binding class.

My question is: How do I get the value of the NUMERIC type from GenericRecord (which I think is an Avro object)?

Any help appreciated. I can get the row using readTableRows(), which all comes back as strings so I'm not looking to understand that method.

1
Just for clarification, you are reading from BigQuery and where are you writing the output ? - Alexandre Moraes
Hi Alexandre, I'm reading from BigQuery and transforming the rows into a protobuf object that I will pass into another function (for instance, averaging the aPrice values). The output could be the average values, or something else (still writing the pipeline). - Fred Tsang

1 Answers

1
votes

The GenericRecord field corresponding to NUMERIC fields have some additional properties, that you can use to parse a NUMERIC as a java.math.BigDecimal.

The schema of such a field will be of type BYTES, and something like the following:

{"type":"bytes","logicalType":"decimal","precision":38,"scale":9}

I have just published a blog post explaining how to use those properties in the schema to transform the array of bytes in to java.math.BigDecimal:

https://medium.com/@iht/reading-numeric-fields-with-bigqueryio-in-apache-beam-23273a9d0c99