3
votes

I'm trying to create a streaming pipeline with Dataflow that reads messages from a PubSub topic to end up writing them on a BigQuery table. I don't want to use any Dataflow template.

For the moment I just want to create a pipeline in a Python3 script executed from a Google VM Instance to carry out a loading and transformation process of every message that arrives from Pubsub (parsing the records that it contains and adding a new field) to end up writing the results on a BigQuery table.

Simplifying, my code would be:

#!/usr/bin/env python
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1, 
import apache_beam as beam
import apache_beam.io.gcp.bigquery
import logging
import argparse
import sys
import json
from datetime import datetime, timedelta

def load_pubsub(message):
    try:
        data = json.loads(message)
        records = data["messages"]
        return records
    except:
        raise ImportError("Something went wrong reading data from the Pub/Sub topic")

class ParseTransformPubSub(beam.DoFn):
    def __init__(self):
        self.water_mark = (datetime.now() + timedelta(hours = 1)).strftime("%Y-%m-%d %H:%M:%S.%f")
    def process(self, records):
        for record in records:
            record["E"] = self.water_mark 
            yield record

def main():
    table_schema = apache_beam.io.gcp.bigquery.parse_table_schema_from_json(open("TableSchema.json"))
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_topic')
    parser.add_argument('--output_table')
    known_args, pipeline_args = parser.parse_known_args(sys.argv)
    with beam.Pipeline(argv = pipeline_args) as p:
        pipe = ( p | 'ReadDataFromPubSub' >> beam.io.ReadStringsFromPubSub(known_args.input_topic)
                   | 'LoadJSON' >> beam.Map(load_pubsub)
                   | 'ParseTransform' >> beam.ParDo(ParseTransformPubSub())
                   | 'WriteToAvailabilityTable' >> beam.io.WriteToBigQuery(
                                      table = known_args.output_table,
                                      schema = table_schema,
                                      create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                      write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
                )   
        result = p.run()
        result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

(For example) The messages published in the PubSub topic use to come as follows:

'{"messages":[{"A":"Alpha", "B":"V1", "C":3, "D":12},{"A":"Alpha", "B":"V1", "C":5, "D":14},{"A":"Alpha", "B":"V1", "C":3, "D":22}]}'

If the field "E" is added in the record, then the structure of the record (dictionary in Python) and the data type of the fields is what the BigQuery table expects.

The problems that a I want to handle are:

  1. If some messages come with an unexpected structure I want to fork the pipeline flatten and write them in another BigQuery table.

  2. If some messages come with an unexpected data type of a field, then in the last level of the pipeline when they should be written in the table an error will occur. I want to manage this type of error by diverting the record to a third table.

I read the documentation found on the following pages but I found nothing: https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline https://cloud.google.com/dataflow/docs/guides/common-errors

By the way, if I choose the option to configure the pipeline through the template that reads from a PubSubSubscription and writes into BigQuery I get the following schema which turns out to be the same one I'm looking for:

Template: Cloud Pub/Sub Subscription to BigQuery

1

1 Answers

6
votes

You can't catch the errors that occur in the sink to BigQuery. The message that you write into bigquery must be good.

The best pattern is to perform a transform which checks your messages structure and fields type. In case of error, you create an error flow and you write this issue flow in a file (for example, or in a table without schema, you write in plain text your message)