3
votes

I am using apache beam pipeline and I want to batch insert to bigquery with python. My data comes from Pub/Sub which is unbounded. As a result of my research, GlobalWindows with triggers should solve my problem. I tried my pipeline with windowing, but it does still streaming insertion.My pipeline code is the following:

p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
    with_attributes=True,
    timestamp_attribute=None,id_label=None)
       | 'Windowing' >>  beam.WindowInto(window.GlobalWindows(),
           trigger=Repeatedly(
                   AfterAny(
                AfterCount(100),
           AfterProcessingTime(1 * 60))), 
        accumulation_mode=AccumulationMode.DISCARDING)
      | 'Process ' >> beam.Map(getAttributes))
p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
         | 'Delete ' >> beam.Map(deleteAttribute)
         | 'Write '  >> writeTable(bq_table_test, bq_batch_size))

def writeTable(table_name):
return beam.io.WriteToBigQuery(
    table=table_name,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    batch_size=100)

I'm checking from Billing Reports that the inserts are whether batch or stream. When Streming insert usage increases,I understand that the bulk insertion did not happen. Is there another feature that I can check insertion was stream or batch ? And also how can I do batch insert to bigquery ?

1

1 Answers

1
votes

According to the documentation you cannot specify the insert type, it is automatically detected based on your input PCollection:

The Beam SDK for Python does not currently support specifying the insertion method.

BigQueryIO supports two methods of inserting data into BigQuery: load jobs and streaming inserts. Each insertion method provides different tradeoffs of cost, quota, and data consistency. See the BigQuery documentation for load jobs and streaming inserts for more information about these tradeoffs.

BigQueryIO chooses a default insertion method based on the input PCollection.

BigQueryIO uses load jobs when you apply a BigQueryIO write transform to a bounded PCollection.

BigQueryIO uses streaming inserts when you apply a BigQueryIO write transform to an unbounded PCollection.

In your case you're reading from an unbounded source (Pubsub) so it is always streaming writes in this case. Windowing will not change the nature of the data.

One workaround I can think of is to split the pipeline, e.g. a streaming pipeline would write to a collection of files at some storage (GCS) and then another pipeline would read and upload those files (the files are bounded).