1
votes

I am trying to create a dataflow script that goes from BigQuery back to BigQuery. Our main table is massive and has multiple nested fields which breaks the extract capabilities. I'd like to create a simple table that can be extracted containing all the relevant information.

The SQL Query 'Select * from table.orders where paid = false limit 10' is a simple one to make sure it works. The main query connect to multiple tables within the same project.

This seems to work but I'd like to know what I can do to test it out? Also, How can I get this to run every morning automatically?

Thank you

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam

PROJECT='experimental'
BUCKET='temp1/python2'


def run():
    argv = [
            '--project={0}'.format(PROJECT),
            '--job_name=test1',
            '--save_main_session',
            '--staging_location=gs://{0}/staging/'.format(BUCKET),
            '--temp_location=gs://{0}/staging/'.format(BUCKET),
            '--runner=DataflowRunner'
    ]

    with beam.Pipeline(argv=argv) as p:

        # Read the table rows into a PCollection.
        rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(query =  'Select * from `table.orders` where paid = false limit 10', use_standard_sql=True))

        # Write the output using a "Write" transform that has side effects.
        rows  | 'Write' >> beam.io.WriteToBigQuery(
                table='orders_test',
                dataset='external',
                project='experimental',
                schema='field1:type1,field2:type2,field3:type3',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
1
What do you mean you'd like to test it out? If you want to test it you can try running on a smaller data set to verify that it works? Or are you talking about unit tests? As for running daily, this blog has a good rundown: cloud.google.com/blog/big-data/2016/04/…Lara Schmidt
Do you actually need Dataflow for this? You can use the bq command line tool to execute a query and specify a destination table: bq query --destination_table [table]. Reference: bq command line tool.Andrew Nguonly

1 Answers

0
votes

Running daily: https://cloud.google.com/blog/big-data/2016/04/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions

Testing - you can try running against a smaller data set to test it. If you are running user code (not just read / write) you can test by using data from a file and checking expected results. But since you are just doing a read / write you would need to test using bigquery.