I'm currently loading data from a csv file in Cloud Storage to BigQuery using a Cloud Function.
Right now my logic is to delete the existing table(i.e. TableE) in Big Query and create new table with the same name i.e. TableE and load csv file data to this new table.
This function will trigger when someone uploads updated csv file in cloud storage.
This logic will work fine when the data in csv file is correct as per the datatype of each column. However, when someone uploads the csv file with incorrect datatype value i.e. N/A in Integer column, then the code will delete TableE, but while loading data to new table in Big Query, it will throw parse error.
So the outcome will be that existing TableE got deleted but no data loaded in Big Query due to parse error, and all the reports dependent on TableE will get impacted.
Is there any solution to this. Is there a way to delete existing table only if there is no error in the CSV file data?
I can suggest one way to use temporary table, but is this the only way to handle this?
Below is the code -
def hello_gcs(event, context):
print('Event ID: {}'.format(context.event_id))
print('Event type: {}'.format(context.event_type))
print('Bucket: {}'.format(event['bucket']))
print('File: {}'.format(event['name']))
print('Metageneration: {}'.format(event['metageneration']))
print('Created: {}'.format(event['timeCreated']))
print('Updated: {}'.format(event['updated']))
if event['name'] == "DUMMY FLIGHT DETAIL LIST 18012021.csv":
print('blob checking: {}'.format(event['name']))
def inner_function():
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "project-a-307309.DatasetA.TableE"
load_job = client.delete_table(table_id, not_found_ok=True) # Make an API request.
print("Deleted table '{}'.".format(table_id))
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("SITA", "STRING"),
bigquery.SchemaField("STATUS", "STRING"),
bigquery.SchemaField("PRIORITY", "STRING"),
bigquery.SchemaField("BRAND", "STRING"),
bigquery.SchemaField("FLIGHT_NAME", "STRING"),
bigquery.SchemaField("FLIGHT_TYPE", "STRING"),
bigquery.SchemaField("City", "STRING"),
bigquery.SchemaField("Destination_Tier", "STRING"),
bigquery.SchemaField("Country", "STRING"),
bigquery.SchemaField("Management_Type", "STRING"),
bigquery.SchemaField("Area", "STRING"),
bigquery.SchemaField("Seat_Count", "INTEGER"),
bigquery.SchemaField("Tier", "STRING"),
bigquery.SchemaField("New_Segment", "STRING"),
bigquery.SchemaField("Running_Date", "DATE"),
bigquery.SchemaField("Expected_Opening", "DATE"),
bigquery.SchemaField("New_Updated_Opening", "DATE"),
bigquery.SchemaField("COMMENT", "STRING"),
bigquery.SchemaField("URL", "STRING")
],
skip_leading_rows=6,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://dummy_flight_details/DUMMY FLIGHT DETAIL LIST 18012021.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))
inner_function()
def outer_func():
from google.cloud import storage
import datetime
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("ts")
print(ts)
bucket_name = "dummy_flight_details"
blob_name = "DUMMY FLIGHT DETAIL LIST 18012021.csv"
new_name = "{} - DUMMY FLIGHT DETAIL LIST 18012021.csv".format(ts)
print(new_name)
def rename_blob():
print("function entered")
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
new_blob = bucket.rename_blob(blob, new_name)
print("Blob {} has been renamed to {}".format(blob.name, new_blob.name))
rename_blob()
outer_func()