I'm working on an ETL which is pulling data from a database, doing minor transformation and outputs to BigQuery. I have written my pipeline in Apache Beam 2.26.0 using Python SDK. I'm loading a dozen or so tables, and I'm passing their names as arguments to beam.io.WriteToBigQuery
Now, the documentation says that (https://beam.apache.org/documentation/io/built-in/google-bigquery):
When writing to BigQuery, you must supply a table schema for the destination table that you want to write to, unless you specify a create disposition of CREATE_NEVER.
I believe this is not exactly true. In my tests I saw that this is the case only when passing static table name.
If you have a bunch of tables and want to pass a table name as an argument then it throws an error:
ErrorProto message: 'No schema specified on job or table.'
My code:
bq_data | "Load data to BQ" >> beam.io.WriteToBigQuery(
table=lambda row: bg_config[row['table_name']],
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
)
bq_data is a dict of row of pandas data frame. Where I have a column table_name. bq_config is a dictionary where key = row['table_name'] and the value is of the format:
[project_id]:[dataset_id].[table_id]
Anyone have some thoughts on this?