I'm trying to create a streaming pipeline with Dataflow that reads messages from a PubSub topic and writes them to a BigQuery table. I don't want to use any template. For the moment I just want to create a pipeline in a Python3 script executed from a Google VM Instance to carry out this simple process without any transformation of the data that arrives from Pubsub (the structure of the messages is what the table expects).
The messages published in the PubSub topic come as follows:
data = '{"A":1, "B":"Hey", "C":"You"}'
message = data.encode('utf-8')
This is the function that I'm using for the pipeline:
pipeline_options = PipelineOptions(pipeline_args = None, streaming = True,
save_main_session = True)
parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))))
# table_schema = ["fields" :[{"type":"INTEGER", "name":"A",
# "mode":"REQUIRED"},{"type":"STRING", "name":"B", "mode":"NULLABLE"},
# {"type":"STRING", "name":"C", "mode":"NULLABLE"}]]
with beam.Pipeline(options=pipeline_options) as p:
# Read the pubsub topic and write the menssage into a bigquery table
message = ( p | beam.io.ReadFromPubSub(topic="projects/real-
demand/topics/Test_API", subscription=None)
| beam.io.WriteToBigQuery(table = '$Table', dataset =
'$Dataset', project = '$Project', schema =
table_schema)
)
I have the following error:
AttributeError: 'str' object has no attribute 'items'