3
votes

I am attempting to use dataflow to read a pubsub message and write it to big query. I was given alpha access by the Google team and have gotten the provided examples working but now I need to apply it to my scenario.

Pubsub payload:

Message {
    data: {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
    attributes: {}
}

Big Query Schema:

schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP',

My goal is to simply read the message payload and insert into bigquery. I am struggling with getting my head around the transformations and how should I map the key/values to the big query schema.

I am very new to this so any help is appreciated.

Current code:https://codeshare.io/ayqX8w

Thanks!

3

3 Answers

4
votes

I was able to successfully parse the pubsub string by defining a function that loads it into a json object (see parse_pubsub()). One weird issue I encountered was that I was not able to import json at the global scope. I was receiving "NameError: global name 'json' is not defined" errors. I had to import json within the function.

See my working code below:

from __future__ import absolute_import

import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window

'''Normalize pubsub string to json object'''
# Lines look like this:
  # {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['mac']), (record['status']), (record['datetime'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  parser.add_argument(
      '--output_table', required=True,
      help=
      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
       'or DATASET.TABLE.'))
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:
    # Read the pubsub topic into a PCollection.
    lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(parse_pubsub)
                | beam.Map(lambda (mac_bq, status_bq, datetime_bq): {'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq})
                | beam.io.WriteToBigQuery(
                    known_args.output_table,
                    schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
0
votes

Data written to Python SDK's BigQuery sink should be in the form of a dictionary where each key of the dictionary gives a field of the BigQuery table and corresponding value gives the value to be written to that field. For a BigQuery RECORD type, value itself should be a dictionary with corresponding key,value pairs.

I filed a JIRA to improve documentation of corresponding python module in Beam: https://issues.apache.org/jira/browse/BEAM-3090

0
votes

I have a similar usecase (reading rows as strings from PubSub, converting them to dicts and then processing them).

I am using ast.literal_eval(), which seems to work for me. This command will evaluate the string, but in a safer way than eval() (see here). It should return a dict whose keys are strings, and values are evaluated to the most likely type (int, str, float...). You may want to make sure the values take the correct type though.

This would give you a pipeline like this

import ast
lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
            | "JSON row to dict" >> beam.Map(
                        lambda s: ast.literal_eval(s))
            | beam.io.WriteToBigQuery( ... )
        )

I have not used BigQuery (yet), so I cannot help you on the last line, but what you wrote seems correct at first glance.