1
votes

I have a dataflow pipeline parsing data from a pub sub to big query. The data is on proto3 format.

The data I receive from the pubsub is encoded with the 'SerializeToString()' method from protobuf.
Then I deserialize it and insert parsed data into bigquery, it works perfectly. However I have been request to store the binary data from probotobuf as I received it, in case something goes wrong at insertion time.
To do it I created a simple bigquery table with only one field 'data', accepting BYTES.

So I added a step to my pipeline, it does just get the data from the PubSub message and return it:

class GetBytes(beam.DoFn):
    def process(self, element):

        obj: Dict = {
            'data': element.data
        }
        logging.info(f'data bytes: {obj}')
        logging.info(f'data type: {type(obj["data"])}')
        return [obj]

here are the line from the pipeline I use to insert to BQ:

    bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes()))
    bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery('my_project:my_dataset.my_table')

logs seems to get the right data:

2020-09-29 11:16:40.094 CESTdata bytes: {'data': b'\n\x04\x08\x01\x10\x02\n\x04\x08\x02\x10\x02\n\x02\x08\x03\n\x04\x08\x04\x10\x02\n\x04\x08\x05\x10\x02\n\x04\x08\x06\x10\x02\n\x02\x08\x07\n\x04\x08\x08\x10\x01\n\x02\x08\t\n\x04\x08\n\x10\x01\n\x04\x08\x0b\x10\x02\n\x02\x08\x0c\n\x04\x08\r\x10\x02\n\x04\x08\x0e\x10\x02\n\x04\x08\x0f\x10\x02\n\x04\x08\x10\x10\x02\n\x04\x08\x11\x10\x01\n\x04\x08\x12\x10\x01\n\x04\x08\x01\x10\x02\n\x02\x08\x02\n\x04\x08\x03\x10\x01\n\x02\x08\x04\n\x04\x08\x05\x10\x02\n\x04\x08\x06\x10\x01\n\x04\x08\x07\x10\x02\n\x02\x08\x08\n\x04\x08\t\x10\x01\n\x04\x08\n\x10\x02\n\x04\x08\x0b\x10\x01\n\x02\x08\x0c\n\x04\x08\r\x10\x02\n\x04\x08\x0e\x10\x02\n\x04\x08\x0f\x10\x02\n\x04\x08\x10\x10\x02\n\x04\x08\x11\x10\x02\n\x04\x08\x12\x10\x02\x10\xb4\x95\x99\xc9\xcd.'}

But I keep receiving errors as:

UnicodeDecodeError: 'utf-8 [while running 'generatedPtransform-297']' codec can't decode byte 0x89 in position 101: invalid start byte

(maybe the error does not correspond to the log before but this is always this kind of messages)

I tried to insert my bytes data from the BigQuery UI and everything went fine...

Any idea on what is going wrong ?

Thank you :)

1
what version of Beam are you using?Pablo
hey @Pablo I use apache-beam[gcp]==2.24.0Kimor

1 Answers

2
votes

BigQuery requires bytes values to be base64-encoded when written this way. You can find some documentation and links for more details at https://beam.apache.org/releases/pydoc/2.24.0/apache_beam.io.gcp.bigquery.html#additional-parameters-for-bigquery-tables