1
votes

When I write to a partitioned table in bigquery from dataflow, I'm getting the following error - could any one help me with this?

Invalid table ID \"test$20181126\". Table IDs must be alphanumeric (plus underscores) and must be at most 1024 characters long. Also, Table decorators cannot be used.

This is the python snippet I'm using for writing

import apache_beam as beam


class bqwriter(beam.PTransform):
    def __init__(self, table, schema):
        super(BQWriter, self).__init__()
        self.table = table
        self.schema = schema

    def expand(self, pcoll):
        pcoll | beam.io.Write(beam.io.BigQuerySink(
            self.table,
            schema=self.schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        ))

Im creating tabe like below

a | 'BQWrite' >> BQWriter("test-123:test.test$20181126", table_schema)
2

2 Answers

1
votes

I had the same problem. My solution was to :

  • Either add a date column to the data, then set the BQ table to be partitioned on it

  • Or set the default partitioning on _PARTITIONTIME in BQ.

Both these options mean you only insert into test-123:test.test

As to whether we should be able to do what you were trying to do, it seems yes. The Beam JIRA states they fixed it for Java, but I couldn't find a status for python.

0
votes

The best way to do that is to pass a function to the native beam.io.WriteToBigQuery class :

def table_fn(element):
    current_date = date.fromtimestamp(element['timestamp']).strftime("%Y%m%d")
    return f"{bq_output_table}${current_date}"

user_parent_user_watchever_pcol | "Write to BigQuery" >> 
beam.io.Write(
    beam.io.WriteToBigQuery(
        table_fn,
        schema=schemas.VIDEOCATALOG_SCHEMA,
        method="STREAMING_INSERTS",
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
    )
)