0
votes

I'm a bit surprised that I can't find this answer anywhere.

I have the following schema and set-up:

job_config = bigquery.LoadJobConfig(schema = [
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("Timestamp", "TIMESTAMP"),
bigquery.SchemaField("bar", "INT64"),
bigquery.SchemaField("id", "STRING")
])

load_job = bq_client.load_table_from_dataframe(
    df, '.'.join([PROJECT, DATASET, TABLE]), job_config = job_config
)

load_job.result()

As you can see, I'm loading my data frame table (df) to BigQuery. It is working fine. However, I'd like to load with data frame into a partitioned table and using the Timestamp field to define the date of each partitioned table.

How should I do this?

1

1 Answers

2
votes

You can modify time_partitioning for your LoadJobConfig. The description of the TimePartitioning class can be found here and a similar example in the docs. Use TimePartitioning.field to specify which field to use as the partitioning criteria. In your case it could be something like this (adding a 90-day expiration rule):

job_config = bigquery.LoadJobConfig(
    schema = [
        bigquery.SchemaField("foo", "STRING"),
        bigquery.SchemaField("Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("bar", "INT64"),
        bigquery.SchemaField("id", "STRING")
    ],
    time_partitioning = bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="Timestamp",  # field to use for partitioning
        expiration_ms=7776000000  # 90 days
    )
)

We can use the LoadJob result to verify that everything is ok:

print("Written {} rows to {}".format(result.output_rows, result.destination))
print("Partitioning: {}".format(result.time_partitioning))
Written 4 rows to TableReference(DatasetReference(u'PROJECT_ID', u'test'), 'pandas_partitioned')
Partitioning: TimePartitioning(expirationMs=7776000000,field=Timestamp,type=DAY)

and describe the newly-created table:

$ bq show test.pandas_partitioned
Table PROJECT_ID:test.pandas_partitioned

   Last modified            Schema            Total Rows   Total Bytes   Expiration                  Time Partitioning                   Clustered Fields   Labels  
 ----------------- ------------------------- ------------ ------------- ------------ -------------------------------------------------- ------------------ -------- 
  21 Dec 10:01:42   |- Timestamp: timestamp   4            107                        DAY (field: Timestamp, expirationMs: 7776000000)                              
                    |- bar: integer                                                                                                                                 
                    |- foo: string                                                                                                                                  
                    |- id: string

Full code:

from datetime import datetime  
from datetime import timedelta 

import pandas as pd
from google.cloud import bigquery


PROJECT = "PROJECT_ID"
DATASET = "test"
TABLE = "pandas_partitioned"

bq_client = bigquery.Client(project=PROJECT)

job_config = bigquery.LoadJobConfig(
    schema = [
        bigquery.SchemaField("foo", "STRING"),
        bigquery.SchemaField("Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("bar", "INT64"),
        bigquery.SchemaField("id", "STRING")
    ],
    time_partitioning = bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="Timestamp",  # name of the column to use for partitioning
        expiration_ms=7776000000  # 90 days
    )
)

data = {"foo": ["fighters", "manchu", "bar", "kien"],
       "Timestamp": [datetime.now() - timedelta(days=i) for i in range(4)],
       "bar": [100, 50, 75, 66],
       "id": ["1", "2", "3", "14"]}

df = pd.DataFrame(data)

load_job = bq_client.load_table_from_dataframe(
    df, '.'.join([PROJECT, DATASET, TABLE]), job_config = job_config
)

result = load_job.result()

print("Written {} rows to {}".format(result.output_rows, result.destination))
print("Partitioning: {}".format(result.time_partitioning))