I have a dataflow pipeline parsing data from a pub sub to big query. The data is on proto3 format.
The data I receive from the pubsub is encoded with the 'SerializeToString()' method from protobuf.
Then I deserialize it and insert parsed data into bigquery, it works perfectly. However I have been request to store the binary data from probotobuf as I received it, in case something goes wrong at insertion time.
To do it I created a simple bigquery table with only one field 'data', accepting BYTES.
So I added a step to my pipeline, it does just get the data from the PubSub message and return it:
class GetBytes(beam.DoFn):
def process(self, element):
obj: Dict = {
'data': element.data
}
logging.info(f'data bytes: {obj}')
logging.info(f'data type: {type(obj["data"])}')
return [obj]
here are the line from the pipeline I use to insert to BQ:
bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes()))
bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery('my_project:my_dataset.my_table')
logs seems to get the right data:
2020-09-29 11:16:40.094 CESTdata bytes: {'data': b'\n\x04\x08\x01\x10\x02\n\x04\x08\x02\x10\x02\n\x02\x08\x03\n\x04\x08\x04\x10\x02\n\x04\x08\x05\x10\x02\n\x04\x08\x06\x10\x02\n\x02\x08\x07\n\x04\x08\x08\x10\x01\n\x02\x08\t\n\x04\x08\n\x10\x01\n\x04\x08\x0b\x10\x02\n\x02\x08\x0c\n\x04\x08\r\x10\x02\n\x04\x08\x0e\x10\x02\n\x04\x08\x0f\x10\x02\n\x04\x08\x10\x10\x02\n\x04\x08\x11\x10\x01\n\x04\x08\x12\x10\x01\n\x04\x08\x01\x10\x02\n\x02\x08\x02\n\x04\x08\x03\x10\x01\n\x02\x08\x04\n\x04\x08\x05\x10\x02\n\x04\x08\x06\x10\x01\n\x04\x08\x07\x10\x02\n\x02\x08\x08\n\x04\x08\t\x10\x01\n\x04\x08\n\x10\x02\n\x04\x08\x0b\x10\x01\n\x02\x08\x0c\n\x04\x08\r\x10\x02\n\x04\x08\x0e\x10\x02\n\x04\x08\x0f\x10\x02\n\x04\x08\x10\x10\x02\n\x04\x08\x11\x10\x02\n\x04\x08\x12\x10\x02\x10\xb4\x95\x99\xc9\xcd.'}
But I keep receiving errors as:
UnicodeDecodeError: 'utf-8 [while running 'generatedPtransform-297']' codec can't decode byte 0x89 in position 101: invalid start byte
(maybe the error does not correspond to the log before but this is always this kind of messages)
I tried to insert my bytes data from the BigQuery UI and everything went fine...
Any idea on what is going wrong ?
Thank you :)