3
votes

I am running an apache beam dataflow job, which reads from a bucket, performs some transformation and write to bigquery. But the records are inserted into the streaming buffer.

validated_data = (p1
                  | 'Read files from Storage '+url >> beam.io.ReadFromText(url)
                  | 'Validate records ' + url >> beam.Map(data_ingestion.validate, url)\
                  .with_outputs(SUCCESS_TAG_KEY, FAILED_TAG_KEY, main="main")
)
all_data, _, _ = validated_data
success_records = validated_data[SUCCESS_TAG_KEY]
failed_records = validated_data[FAILED_TAG_KEY]


(success_records
 | 'Extracting row from tagged row {}'.format(url) >> beam.Map(lambda row: row['row'])
 | 'Write to BigQuery table for {}'.format(url) >> beam.io.WriteToBigQuery(
            table=data_ingestion.get_table(tmp=TEST, run_date=data_ingestion.run_date),
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
)

Actually, I need to delete the partition before running above as a way to avoid duplicated records for ingestion time partitioned table.

And Say If I run this job more than 1 time for the same file, without truncating the table, the table will end up having duplicate records.

And because last records are in streaming buffer, the delete partition table command does not actually remove the partition. Below is the code I am using to truncate the table. and this code runs before running the pipeline

client = bigquery.Client()
dataset = TABLE_MAP['dataset']
table = TABLE_MAP[sentiment_pipeline][table_type]['table']
table_id = "{}${}".format(table, format_date(run_date, '%Y%m%d'))
table_ref = client.dataset(dataset).table(table_id)
output = client.delete_table(table_ref)
1
You can control how data is inserted via BigQuery through these options: - FILE_LOADS: beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/… - STREAMING_INSERTS: beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/… - Felipe Hoffa
I am not sure if same is available for apache beam python sdk! - Deepak Verma
Hello Deepak. In fact, for Batch pipelines Dataflow only supports file loads - and for streaming pipelines it only supports streaming inserts. If you are running a patch pipeline. On the other hand, if you are using a direct runner, then unfortunately only streaming inserts are supported no matter what. What runner are you using, and why are you re-running a batch pipeline? - Pablo
Hi Pablo, I am using dataflow runner. I believe that python sdk for apach beam only have functionaly to straming insert into bigquery, apche beam internally uses insertAll method which only insert data in streaming mode. Apache beam python sdk does not have any option to specify load job - Deepak Verma
I am basically talking about this github.com/apache/beam/commit/… . But as of now this is only available in git repo of apache beam, the apache beam version availble from pip does not have these changes. As of writing this comment, pip only support apach-beam==2.10 while beam-release tag for the same does not have these changes. - Deepak Verma

1 Answers

0
votes

According to BigQuery documentation, you may have to wait 30 minutes in order to make a DML statement on a a streaming table, and schema changes like delete/truncate tables might result in data loss for some scenarios. Here are some workarounds you could try for dealing with duplicates in a streaming scenario.

Additionally, Apache Beam and Dataflow now supports batch insert for python, so it might be a good way to avoid streaming limitations.