1
votes

Imagine a simple Google Dataflow Pipeline. In this pipeline you read from BQ using apache beam function and depending of the returned pcollection you have to update those rows

Journeys = (p
                    | 'Read from BQ' >> beam.io.Read(
                    beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True)))

Update = ( Journeys
                   | 'Updating Journey Table' >> beam.Map(UpdateBQ))

Write = (Journeys
                    | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))

The problem of this pipeline is that UpdateBQ is executed for each item in the returned pcollection when you read the table (beam.Map)


Which could be the better way to perform an update into a BigQuery table?

I suppose this could be done without using beam.Map and execute only and update which process all input pcolletion at once.


Extra

def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD = "#standardSQL"
    QUERY = STD + "\n" + """UPDATE table SET Field= 'YYY' WHERE Field2='XXX'"""
    client.use_legacy_sql = False
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    <...>

Possible solution

with beam.Pipeline(options=options) as p:
    Journeys = (p
                | 'Read from BQ' >> beam.io.Read(
                beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True))
                )

    Write = (Journeys
                | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))


UpdateBQ();
1

1 Answers

1
votes

Are you doing any further transformation using the beam pipeline after reading from BQ? Or is it just the way you showed in the code i.e. read from BQ and then fire update command in BQ? In that case, you don't need beam at all. Just use BQ query to update data in a table using another table. BQ best practices suggest to avoid single row insert/update at a time.