I am using apache beam pipeline and I want to batch insert to bigquery with python. My data comes from Pub/Sub which is unbounded. As a result of my research, GlobalWindows with triggers should solve my problem. I tried my pipeline with windowing, but it does still streaming insertion.My pipeline code is the following:
p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
with_attributes=True,
timestamp_attribute=None,id_label=None)
| 'Windowing' >> beam.WindowInto(window.GlobalWindows(),
trigger=Repeatedly(
AfterAny(
AfterCount(100),
AfterProcessingTime(1 * 60))),
accumulation_mode=AccumulationMode.DISCARDING)
| 'Process ' >> beam.Map(getAttributes))
p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
| 'Delete ' >> beam.Map(deleteAttribute)
| 'Write ' >> writeTable(bq_table_test, bq_batch_size))
def writeTable(table_name):
return beam.io.WriteToBigQuery(
table=table_name,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
batch_size=100)
I'm checking from Billing Reports that the inserts are whether batch or stream. When Streming insert usage increases,I understand that the bulk insertion did not happen. Is there another feature that I can check insertion was stream or batch ? And also how can I do batch insert to bigquery ?