I am trying to create a dataflow script that goes from BigQuery back to BigQuery. Our main table is massive and has multiple nested fields which breaks the extract capabilities. I'd like to create a simple table that can be extracted containing all the relevant information.
The SQL Query 'Select * from table.orders where paid = false limit 10'
is a simple one to make sure it works. The main query connect to multiple tables within the same project.
This seems to work but I'd like to know what I can do to test it out? Also, How can I get this to run every morning automatically?
Thank you
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
PROJECT='experimental'
BUCKET='temp1/python2'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=test1',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
with beam.Pipeline(argv=argv) as p:
# Read the table rows into a PCollection.
rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(query = 'Select * from `table.orders` where paid = false limit 10', use_standard_sql=True))
# Write the output using a "Write" transform that has side effects.
rows | 'Write' >> beam.io.WriteToBigQuery(
table='orders_test',
dataset='external',
project='experimental',
schema='field1:type1,field2:type2,field3:type3',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
bq
command line tool to execute a query and specify a destination table:bq query --destination_table [table]
. Reference: bq command line tool. – Andrew Nguonly