I hope someone can help me as I'm at a dead-end now ???? I'm learning's Apache Beam and working on a streaming pipeline (Python 3.7, apache-beam v 2.15, DirectRunner for now) reading from PubSub source.
Applying multiple transforms process to the same PCollection to produces multiple outputs to ultimately sink the output into to 2 different BQ tables.
However, when I run the pipeline it seems each of these transforms are are influencing each other (by mutating element) and thus causing a schema mismatch when executing bigquery.WriteToBigQuery transform
Each PubSub message, along with message attributes is formatted as follows:
{'data': json.dumps({ "key1": "val1", "key2": "val2"}).encode(),
'attributes':{'evt_time': '2019-09-14T22:12:43.323546Z'}}
Id like to read these messages and dump to 2 BQ tables:
table1
raw_table
schema:evt_time,date
(storing raw data along with event_time extracted from message attribute){'evt_time': '2019-09-14T22:12:43.323546Z', 'payload': '{"key1": "val1", "key2": "val2"}'}
table2
parsed_table
schema:evt_time,key1,key2
(parsing & storing each KV pair in the data as a flat table along with event_time extracted form message attribute){'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}
this my pseudo code so far
1)
message is read into pipeline and then apply map transform which essentially extracts data
to payload
field and evt_time
to evt_time
field
row = p
| "read_sub" >> beam.Create([ {'data': json.dumps({ "key1": "val1", "key2": "val2"}).encode(),
'attributes':{'evt_time': '2019-09-14T22:12:43.323546Z'}}])
| "add_timestamps" >> beam.Map(add_timestamps)`
2)
I then store row
into table1 raw_table
row | "raw_stream_to_bq" >>bigquery.WriteToBigQuery(project = PROJECT,
dataset = BQ_DATASET, table ="test_raw", schema=SCHEMA_RAW_TABLE)
3) apply further parsing to row
extracting KV pair before strong each message element to table2 parsed_table
row | "parse" >> beam.Map(parse_payload) \
| "parsed_stream_to_bq" >> bigquery.WriteToBigQuery(project=PROJECT,
dataset=BQ_DATASET, table="test_parsed", schema=SCHEMA_PARSED_TABLE)
following map functions add_timestamps
and parse_payload
are defined as follows:
def add_timestamps(e,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
payload = e['data'].decode()
evt_time = e['attributes']['evt_time']
row = {'evt_time' : evt_time, 'payload' : payload}
return row
def parse_payload(e,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
payload = json.loads(e.pop('payload'))
e.update(payload)
return e
However when I run the pipeline it gives BQ streaming insert error as below, and I get the feeling due to downstream step #3, each message row element is already getting parsed / mutated when step#2 BQ write transform is applied and thus BQ streaming insets in step #2 fails due to schema mismatch.
Errors are [
errors: [
debugInfo: ''
location: 'key1'
message: 'no such field.'
reason: 'invalid'>]
index: 0>]
UPDATE:
1) when I use WriteToText
transform instead of WriteToBigQuery
it works fine, as expected
step #1 output:
{'evt_time': '2019-09-14T22:12:43.323546Z', 'payload': '{"key1": "val1", "key2": "val2"}'}
step #3 map transform output:
{'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}
2) HOWEVER when using WriteToBigQuery
both above steps output same parsed KV pairs {'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}
UPDATE 2 - RESOLVED
I did two changes after @Guillem suggestions below (and it also works on DirectRunner now)
3) avoid modifying elements in-place
4) removed schema=
parameter from bigquery.WriteToBigQuery
transforms and pre-build BQ tables before launching the pipeline after noticing following errors sometimes during first write
File “/PATH_TO_PY/python3.7/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
http_response, method_config=method_config, request=request) apitools.base.py.exceptions.HttpConflictError: HttpError accessing https://www.googleapis.com/bigquery/v2/projects/arctic-rite-234823/datasets/pipeline/tables?alt=json: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Mon, 16 Sep 2019 02:52:35 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '409', 'content-length': '334', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 409,
"message": "Already Exists: Table PROJECT_ID:DATASET_ID.TABLE_ID”,
"errors": [
{ "message": "Already Exists: Table PROJECT_ID:DATASET_ID.TABLE_ID", "domain": "global", "reason": "duplicate" }
],
"status": "ALREADY_EXISTS"
}