1
votes

I'm trying to create a streaming pipeline with Dataflow that reads messages from a PubSub topic and writes them to a BigQuery table. I don't want to use any template. For the moment I just want to create a pipeline in a Python3 script executed from a Google VM Instance to carry out this simple process without any transformation of the data that arrives from Pubsub (the structure of the messages is what the table expects).

The messages published in the PubSub topic come as follows:

data = '{"A":1, "B":"Hey", "C":"You"}'
message = data.encode('utf-8')

This is the function that I'm using for the pipeline:

pipeline_options = PipelineOptions(pipeline_args = None, streaming = True, 
save_main_session = True)
parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))))

# table_schema = ["fields" :[{"type":"INTEGER", "name":"A", 
# "mode":"REQUIRED"},{"type":"STRING", "name":"B", "mode":"NULLABLE"}, 
# {"type":"STRING", "name":"C", "mode":"NULLABLE"}]]


with beam.Pipeline(options=pipeline_options) as p:

    # Read the pubsub topic and write the menssage into a bigquery table

    message = ( p | beam.io.ReadFromPubSub(topic="projects/real- 
                    demand/topics/Test_API", subscription=None)
                  | beam.io.WriteToBigQuery(table = '$Table', dataset = 
                    '$Dataset', project = '$Project', schema = 
                    table_schema)
               )

I have the following error:

 AttributeError: 'str' object has no attribute 'items'
1
It looks like you're encoding your message data with 'uft-8', did you mean to use 'utf-8'?Lauren
Yes, sorry! I mean to use 'utf-8'JPMC

1 Answers

4
votes

You are passing in a string and not JSON. You would need to parse the input string as json as below

def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['A']), (record['B']), (record['C'])

and in your pipeline you would have to do something like this

  lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(parse_pubsub)
                | beam.Map(lambda (a_bq, b_bq, c_bq): {'A': a_bq, 'B': b_bq, 'C': c_bq})
                | beam.io.WriteToBigQuery(
                    known_args.output_table,
                    schema=' A:STRING, B:STRING, C:STRING',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )