0
votes

Given a Pub/Sub topic, BigQuery enables streaming data to a table using Dataflow SQL syntax.

Let's say you post this message {"a": 1, "b": 2, "c": 3} to a topic. In BigQuery, with the Dataflow engine, you would need to define the my_topic schema as

Step1

event_timestamp: TIMESTAMP
a: INT64
b: INT64
c: INT64

And then creating a Dataflow streaming job using that command, so that it streams every message to a destination BigQuery table.

Step2

gcloud dataflow sql query 'SELECT * FROM pubsub.topic.my_project.my_topic' \
  --job-name my_job --region europe-west1 --bigquery-write-disposition write-append \
  --bigquery-project my_project --bigquery-dataset staging --bigquery-table my_topic

gcloud pubsub topics publish my_topic --message='{"a": 1, "b": 2, "c": 3}'
​
bq query --nouse_legacy_sql \
  'SELECT * FROM my_project.staging.my_topic ORDER BY event_timestamp DESC LIMIT 10'

+---------------------+-----+-----+-----+
|   event_timestamp   |  a  |  b  |  c  |
+---------------------+-----+-----+-----+
| 2020-10-28 14:21:40 |  1  |  2  |  3  |

At Step 2 I would like to send also --attribute="origin=gcloud,username=gcp" to the Pub/Sub topic. Is is possible to define the schema at Step 1 so that it writes to the table automatically?

I have been trying different things:

  • attributes: STRUCT in the schema, following this Beam extensions documentation, but all I get is JSON parsing errors in Dataflow
  • gcloud pubsub topics publish my_topic --message='{"a": 1, "b": 2}' --attribute='c=3' expecting the message to be flattened as in this piece of code, but I get a NULL value for c in the resulting table.

Thank you.

1
I can't achieve the same thing. It's maybe impossible!!guillaume blaquiere
The only way to achieve same behavior seems to use a WHERE sql statement from the topic schema to filter messages at in the Dataflow job. Dataflow SQL misses the possibility to filter attributes like in subscriptions.Michel Hua

1 Answers

3
votes

Pub/Sub attributes are of MAP type, but that is not one of Dataflow SQL's supported types. There were discussions about adding support, but I don't know the status of that.

If attributes are important, I suggest creating a custom pipeline using ReadFromPubSub