Is there any Python template/script (existing or roadmap) for Dataflow/Beam to read from PubSub and write to BigQuery? As per the GCP documentation, there is only a Java template.
Thanks !
Is there any Python template/script (existing or roadmap) for Dataflow/Beam to read from PubSub and write to BigQuery? As per the GCP documentation, there is only a Java template.
Thanks !
You can find an example here Pub/Sub to BigQuery sample with template:
An Apache Beam streaming pipeline example.
It reads JSON encoded messages from Pub/Sub, transforms the message data, and writes the results to BigQuery.
Here's another example that shows how to handle invalid message from pubsub into a different table in Bigquery :
class ParseMessage(beam.DoFn):
OUTPUT_ERROR_TAG = 'error'
def process(self, line):
"""
Extracts fields from json message
:param line: pubsub message
:return: have two outputs:
- main: parsed data
- error: error message
"""
try:
parsed_row = _ # parse json message to corresponding bgiquery table schema
yield data_row
except Exception as error:
error_row = _ # build you error schema here
yield pvalue.TaggedOutput(self.OUTPUT_ERROR_TAG, error_row)
def run(options, input_subscription, output_table, output_error_table):
"""
Build and run Pipeline
:param options: pipeline options
:param input_subscription: input PubSub subscription
:param output_table: id of an output BigQuery table
:param output_error_table: id of an output BigQuery table for error messages
"""
with beam.Pipeline(options=options) as pipeline:
# Read from PubSub
rows, error_rows = \
(pipeline | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
# Adapt messages from PubSub to BQ table
| 'Parse JSON messages' >> beam.ParDo(ParseMessage()).with_outputs(ParseMessage.OUTPUT_ERROR_TAG,
main='rows')
)
_ = (rows | 'Write to BigQuery'
>> beam.io.WriteToBigQuery(output_table,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
)
)
_ = (error_rows | 'Write errors to BigQuery'
>> beam.io.WriteToBigQuery(output_error_table,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
)
)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_subscription', required=True,
help='Input PubSub subscription of the form "/subscriptions/<PROJECT>/<SUBSCRIPTION>".')
parser.add_argument(
'--output_table', required=True,
help='Output BigQuery table for results specified as: PROJECT:DATASET.TABLE or DATASET.TABLE.')
parser.add_argument(
'--output_error_table', required=True,
help='Output BigQuery table for errors specified as: PROJECT:DATASET.TABLE or DATASET.TABLE.')
known_args, pipeline_args = parser.parse_known_args()
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
run(pipeline_options, known_args.input_subscription, known_args.output_table, known_args.output_error_table)