Is it possible to import data already in Cloud Storage to a temporary table in bigquery using Python? Can I create a BigQuery temporary table in Python and insert data into it?
1
votes
What is the format of your data, .csv?
- manesioz
yes, is possible, but you need to execute some steps. Are you running Python by APP engine or GCF, Local ?
- Luiz Lai
@ZacharyManesiotis, can be csv or json!
- Felipe FB
@LuizLai, I am testing on my local machine but the idea is to create a function to process inside airflow
- Felipe FB
2 Answers
5
votes
You can only create temporary tables as part of a bigquery script or stored procedure.
What you can do is create tables with a random suffix name and a short expiry. One hour in my example. The example function create the temp table and only need a dataset as a parameter.
from google.cloud import bigquery
import datetime, pytz, random
PROJECT = "myproject"
def get_temp_table(dataset: str, table_name: str = None, project=None) -> bigquery.Table:
prefix = "temp"
suffix = random.randint(10000, 99999)
if not table_name:
table_name = "noname"
temp_table_name = f"{dataset}.{prefix}_{table_name}_{suffix}"
if project:
temp_table_name = f"{project}.{temp_table_name}"
tmp_table_def = bigquery.Table(temp_table_name)
tmp_table_def.expires = datetime.datetime.now(pytz.utc) + datetime.timedelta(
hours=1
)
return tmp_table_def
client = bigquery.Client(project=PROJECT)
tmp_table_def = get_temp_table("mydataset", "new_users", project=PROJECT)
tmp_table_def.schema = [
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
]
tmp_table = client.create_table(tmp_table_def) # type: bigquery.Table
data = [
{"id": "c-1234", "full_name": "John Smith", "age": 39},
{"id": "c-1234", "full_name": "Patricia Smith", "age": 41},
]
errors = client.insert_rows(tmp_table, data)
print(f"Loaded {len(data)} rows into {tmp_table.dataset_id}:{tmp_table.table_id} with {len(errors)} errors")
0
votes
(this draft don't consider a temporary table, but i think can help.) I used this with google cloud functions and Python 3.7 and works fine.
from google.cloud import storage,bigquery
import json
import os
import csv
import io
import pandas as pd
def upload_dataframe_gbq(df,table_name):
bq_client = bigquery.Client()
dataset_id = 'your_dataset_id'
dataset_ref = bq_client.dataset(dataset_id)
table_ref = dataset_ref.table(table_name)
job = bq_client.load_table_from_dataframe(df, table_ref)
job.result() # Waits for table load to complete.
assert job.state == "DONE"
table = bq_client.get_table(table_ref)
print(table.num_rows)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="your_credentials.json"
client = storage.Client()
bucket = client.get_bucket('your_bucket_name')
blob = bucket.blob('sample.csv')
content = blob.download_as_string()
csv_content = BytesIO(content)
df = pd.read_csv(csv_content, sep=",", header=0 )
table_name = "your_big_query_table_name"
upload_dataframe_gbq(df,table_name)