currently I am facing issues getting my beam pipeline running on Dataflow to write data from Pub/Sub into BigQuery. I've looked through the various steps and all the data itself seems to be getting altered as expected. The problem comes from the step using beam.io.gcp.bigquery.WriteToBigQuery. Checking the stackdriver shows:
There were errors inserting to BigQuery: [<InsertErrorsValueListEntry errors: [<ErrorProto debugInfo: '' location: 'root.array[0].thing' message: 'no such field.' reason: 'invalid'>] index: 0>,
I cut the above log short, but the rest of it is more of the same; missing fields. While this error is accurate in that those fields do not exist in the schema, I pass the additional_bq_parameters as:
{
'schemaUpdateOptions': ["ALLOW_FIELD_ADDITION"],
'ignoreUnknownValues': True,
'maxBadRecords': 1000,
}
These additional arguments seem to get ignored whether I use a callable that just returns the above dictionary, or just set additional_bq_parameters equal to the dictionary itself.
The ParDo function's process looks like the following:
def process(self, tuple):
import apache_beam as beam
def get_additional_bq_parameters(_element):
return {
'schemaUpdateOptions': ["ALLOW_FIELD_ADDITION"],
'ignoreUnknownValues': True,
'maxBadRecords': 1000,
}
key, data = tuple
table_name = f"table.{key}"
(data | 'Upload' >> beam.io.gcp.bigquery.WriteToBigQuery(
table=table_name,
schema=self.schemas[key], # list of schemas passed to init
additional_bq_parameters=get_additional_bq_parameters,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method='STREAMING_INSERTS',
insert_retry_strategy="RETRY_NEVER")
)
When running the pipeline on Dataflow I pass the following arguments:
python3 script.py \
--project=<project> \
--job_name=<name> \
--streaming \
--runner=DataflowRunner \
--temp_location=<temp> \
--experiments use_beam_bq_sink \
--staging_location=<location> \
--num_workers=2
If anyone can elaborate why the additional arguments to BigQuery are seemingly not being recognized I'd greatly appreciate it.
Furthermore, I've tried returning the results from the write to BigQuery in an attempt to persist the failed rows to GCS. However, when I try to access the failed rows from the resulting PCollection with either results[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS] or results['FailedRows'] I get the error TypeError: 'PCollection' object is not subscriptable. Based on what I've seen, I thought that this was the proper approach. If anyone could clarify this as well, I would be really grateful.