0
votes

I'm currently loading data from a csv file in Cloud Storage to BigQuery using a Cloud Function.

Right now my logic is to delete the existing table(i.e. TableE) in Big Query and create new table with the same name i.e. TableE and load csv file data to this new table.

This function will trigger when someone uploads updated csv file in cloud storage.

This logic will work fine when the data in csv file is correct as per the datatype of each column. However, when someone uploads the csv file with incorrect datatype value i.e. N/A in Integer column, then the code will delete TableE, but while loading data to new table in Big Query, it will throw parse error.

So the outcome will be that existing TableE got deleted but no data loaded in Big Query due to parse error, and all the reports dependent on TableE will get impacted.

Is there any solution to this. Is there a way to delete existing table only if there is no error in the CSV file data?

I can suggest one way to use temporary table, but is this the only way to handle this?

Below is the code -

def hello_gcs(event, context):
print('Event ID: {}'.format(context.event_id))
print('Event type: {}'.format(context.event_type))
print('Bucket: {}'.format(event['bucket']))
print('File: {}'.format(event['name']))
print('Metageneration: {}'.format(event['metageneration']))
print('Created: {}'.format(event['timeCreated']))
print('Updated: {}'.format(event['updated']))
if event['name'] == "DUMMY FLIGHT DETAIL LIST 18012021.csv":
   print('blob checking: {}'.format(event['name']))
   def inner_function():
       from google.cloud import bigquery
# Construct a BigQuery client object.
       client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
       table_id = "project-a-307309.DatasetA.TableE"
       load_job = client.delete_table(table_id, not_found_ok=True)  # Make an API request.
       print("Deleted table '{}'.".format(table_id))
       job_config = bigquery.LoadJobConfig(
          schema=[
             bigquery.SchemaField("SITA", "STRING"),
             bigquery.SchemaField("STATUS", "STRING"),
             bigquery.SchemaField("PRIORITY", "STRING"),
             bigquery.SchemaField("BRAND", "STRING"),
             bigquery.SchemaField("FLIGHT_NAME", "STRING"),
             bigquery.SchemaField("FLIGHT_TYPE", "STRING"),
             bigquery.SchemaField("City", "STRING"),
             bigquery.SchemaField("Destination_Tier", "STRING"),
             bigquery.SchemaField("Country", "STRING"),
             bigquery.SchemaField("Management_Type", "STRING"),
             bigquery.SchemaField("Area", "STRING"),
             bigquery.SchemaField("Seat_Count", "INTEGER"),
             bigquery.SchemaField("Tier", "STRING"),
             bigquery.SchemaField("New_Segment", "STRING"),
             bigquery.SchemaField("Running_Date", "DATE"),
             bigquery.SchemaField("Expected_Opening", "DATE"),
             bigquery.SchemaField("New_Updated_Opening", "DATE"),
             bigquery.SchemaField("COMMENT", "STRING"),
             bigquery.SchemaField("URL", "STRING")
            ],
          skip_leading_rows=6,
    # The source format defaults to CSV, so the line below is optional.
        source_format=bigquery.SourceFormat.CSV,
        )
       uri = "gs://dummy_flight_details/DUMMY FLIGHT DETAIL LIST 18012021.csv"
       load_job = client.load_table_from_uri(
          uri, table_id, job_config=job_config
        )  # Make an API request.
       load_job.result()  # Waits for the job to complete.
       destination_table = client.get_table(table_id)  # Make an API request.
       print("Loaded {} rows.".format(destination_table.num_rows))
   inner_function()
   def outer_func():
    from google.cloud import storage
    import datetime
    ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print("ts")
    print(ts)
    bucket_name = "dummy_flight_details"
    blob_name = "DUMMY FLIGHT DETAIL LIST 18012021.csv"
    new_name = "{} - DUMMY FLIGHT DETAIL LIST 18012021.csv".format(ts)
    print(new_name)

    def rename_blob():
        print("function entered")
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        new_blob = bucket.rename_blob(blob, new_name)
        print("Blob {} has been renamed to {}".format(blob.name, new_blob.name))

    rename_blob()

   outer_func()
2
Hi, you can take a look on this doc. You can also check this other post it might help you Loading Data Into BigQuery from Cloud Storage - Mathew Bellamy

2 Answers

1
votes

You can use a specific writeDisposition. Here in the python library. The WRITE_TRUNCATE do exactly what you need. If the new data are valid the data are truncated and replaced. If not, nothing is deleted.

Add this line in your code

job_config.write_disposition = 'WRITE_TRUNCATE'
1
votes

There are a few ways that you can handle this.

First off, instead of deleting and then re-creating the table each time, you can specify set the LoadJobConfig.write_disposition property to the SourceFormat constant WRITE_TRUNCATE, see details here.

For example

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=6,
)

This way, you don't have to explicitly delete the table, you will just overwrite the rows. If this job fails, BigQuery won't delete the table - the previous rows will be overwritten.

You might also want to incorporate error handling so that based on the results of the job (i.e. the job failed), you get an email or message that lets you know