2
votes

I'm working on an ETL which is pulling data from a database, doing minor transformation and outputs to BigQuery. I have written my pipeline in Apache Beam 2.26.0 using Python SDK. I'm loading a dozen or so tables, and I'm passing their names as arguments to beam.io.WriteToBigQuery

Now, the documentation says that (https://beam.apache.org/documentation/io/built-in/google-bigquery):

When writing to BigQuery, you must supply a table schema for the destination table that you want to write to, unless you specify a create disposition of CREATE_NEVER.

I believe this is not exactly true. In my tests I saw that this is the case only when passing static table name.

If you have a bunch of tables and want to pass a table name as an argument then it throws an error:

ErrorProto message: 'No schema specified on job or table.'

My code:

    bq_data | "Load data to BQ" >> beam.io.WriteToBigQuery(
                      table=lambda row: bg_config[row['table_name']],
                      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                      create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
                  )

bq_data is a dict of row of pandas data frame. Where I have a column table_name. bq_config is a dictionary where key = row['table_name'] and the value is of the format:

[project_id]:[dataset_id].[table_id]

Anyone have some thoughts on this?

1
you need to pass a schema that is also dynamic: A lambda or a function that returns the schema for a given tablePablo

1 Answers

0
votes

Have a look at this thread, I addressed it there. In short; I used the internal python time/date-function to render a variable before executing the python BigQuery API request.