0
votes

I hope someone can help me as I'm at a dead-end now ???? I'm learning's Apache Beam and working on a streaming pipeline (Python 3.7, apache-beam v 2.15, DirectRunner for now) reading from PubSub source.

Applying multiple transforms process to the same PCollection to produces multiple outputs to ultimately sink the output into to 2 different BQ tables.

However, when I run the pipeline it seems each of these transforms are are influencing each other (by mutating element) and thus causing a schema mismatch when executing bigquery.WriteToBigQuery transform

Each PubSub message, along with message attributes is formatted as follows:

{'data': json.dumps({ "key1": "val1", "key2": "val2"}).encode(), 'attributes':{'evt_time': '2019-09-14T22:12:43.323546Z'}}

Id like to read these messages and dump to 2 BQ tables:

  • table1 raw_table schema: evt_time,date (storing raw data along with event_time extracted from message attribute) {'evt_time': '2019-09-14T22:12:43.323546Z', 'payload': '{"key1": "val1", "key2": "val2"}'}

  • table2 parsed_table schema: evt_time,key1,key2 (parsing & storing each KV pair in the data as a flat table along with event_time extracted form message attribute) {'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}

this my pseudo code so far

1) message is read into pipeline and then apply map transform which essentially extracts data to payload field and evt_time to evt_time field

row = p 
| "read_sub" >> beam.Create([ {'data': json.dumps({ "key1": "val1", "key2": "val2"}).encode(),
'attributes':{'evt_time': '2019-09-14T22:12:43.323546Z'}}])
| "add_timestamps" >> beam.Map(add_timestamps)`

2) I then store row into table1 raw_table

row | "raw_stream_to_bq" >>bigquery.WriteToBigQuery(project = PROJECT,
 dataset = BQ_DATASET, table ="test_raw", schema=SCHEMA_RAW_TABLE)

3) apply further parsing to row extracting KV pair before strong each message element to table2 parsed_table

row | "parse" >> beam.Map(parse_payload) \
    | "parsed_stream_to_bq" >> bigquery.WriteToBigQuery(project=PROJECT, 
dataset=BQ_DATASET, table="test_parsed", schema=SCHEMA_PARSED_TABLE)

following map functions add_timestamps and parse_payload are defined as follows:

def add_timestamps(e,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
        payload = e['data'].decode()
        evt_time = e['attributes']['evt_time']
        row = {'evt_time' : evt_time, 'payload' : payload}
        return row
def parse_payload(e,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
        payload = json.loads(e.pop('payload'))
        e.update(payload)
        return e

However when I run the pipeline it gives BQ streaming insert error as below, and I get the feeling due to downstream step #3, each message row element is already getting parsed / mutated when step#2 BQ write transform is applied and thus BQ streaming insets in step #2 fails due to schema mismatch.

Errors are [

errors: [

debugInfo: ''

location: 'key1'

message: 'no such field.'

reason: 'invalid'>]

index: 0>]

UPDATE:

1) when I use WriteToText transform instead of WriteToBigQuery it works fine, as expected

  • step #1 output: {'evt_time': '2019-09-14T22:12:43.323546Z', 'payload': '{"key1": "val1", "key2": "val2"}'}

  • step #3 map transform output: {'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}

2) HOWEVER when using WriteToBigQuery both above steps output same parsed KV pairs {'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}

UPDATE 2 - RESOLVED

I did two changes after @Guillem suggestions below (and it also works on DirectRunner now)

3) avoid modifying elements in-place

4) removed schema= parameter from bigquery.WriteToBigQuery transforms and pre-build BQ tables before launching the pipeline after noticing following errors sometimes during first write

File “/PATH_TO_PY/python3.7/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse

http_response, method_config=method_config, request=request) apitools.base.py.exceptions.HttpConflictError: HttpError accessing https://www.googleapis.com/bigquery/v2/projects/arctic-rite-234823/datasets/pipeline/tables?alt=json: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Mon, 16 Sep 2019 02:52:35 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '409', 'content-length': '334', '-content-encoding': 'gzip'}>, content <{

"error": {

"code": 409,

"message": "Already Exists: Table PROJECT_ID:DATASET_ID.TABLE_ID”,

"errors": [

 {

   "message": "Already Exists: Table PROJECT_ID:DATASET_ID.TABLE_ID",

   "domain": "global",

   "reason": "duplicate"

 }

],

"status": "ALREADY_EXISTS"

}

1

1 Answers

1
votes

Commenting out the second write allows the first one to succeed so it seems to be an issue with parse_payload and the DirectRunner.

To fix it you can avoid modifying elements in-place by making a copy inside the Map function:

def parse_payload(element,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
        e = element.copy()
        payload = json.loads(e.pop('payload'))
        e.update(payload)
        return e

It works well without changes using the DataflowRunner:

enter image description here

as the graph is constructed as expected (two separate branches):

enter image description here

Full repro code.