0
votes

I am using the following Python GCP cloud function to load a csv file from the GCS bucket to a BigQuery table.

def csv_in_gcs_to_table(bucket_name: str, object_name: str, dataset_id: str,
                        table_id: str,
                        schema: List[bigquery.SchemaField]) -> None:
    """Upload CSV to BigQuery table.
        If the table already exists, it overwrites the table data.

    Args:
        bucket_name: Bucket name for holding the object
        object_name: Name of object to be uploaded
        dataset_id: Dataset id where the table is located.
        table_id: String holding id of hte table.
        schema: Schema of the table_id
    """
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = schema
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.write_disposition = bigquery.WriteDisposition().WRITE_TRUNCATE
    uri = "gs://{}/{}".format(bucket_name, object_name)
    load_job = client.load_table_from_uri(uri,
                                          dataset_ref.table(table_id),
                                          job_config=job_config)
    load_job.result() 

The function is triggered every time a new file lands in the bucket, and pick the file that correspond to the object_name argument.

I would like the load function to pick the file that was uploaded last to the bucket, in other words the file that triggered the event.

My question is how it can be achieved.

1
I'm not sure I understand the problem. As far as I know object_name is supposed to identify the file that triggered the Cloud Function already, which seems to be what you want. Are you saying that object_name refers to another file than the one that triggered the upload for you?Frank van Puffelen
When i used the object_name argument. I encountered the following error: TypeError: csv_in_gcs_to_table() missing 1 required positional argument: 'object_name' which led me think that i need to supply the file name.ronencozen
It depends on how the Cloud Function is triggered, which I can't figure out from the code you shared. For a Cloud Storage trigger, you'd get the object from event["name"] as shown here: cloud.google.com/functions/docs/tutorials/…Frank van Puffelen
So how do you call this function? The link I provided shows how to get the filename that triggered the event from a Python Cloud Function, but the code you shared seems quite different.Frank van Puffelen
Good to hear you got it working. Can you post that as an answer, instead of as an update to the question. That way the system (and others) also know that your problem has been addressed. Also see: stackoverflow.com/help/self-answerFrank van Puffelen

1 Answers

0
votes

Based on @FrankvanPuffelen advice I adapted the function to capture the event file name. Passing on the event argument that holds all the event variables including the fine name that was triggering the event.

def csv_in_gcs_to_table(event, context):

    from google.cloud import bigquery

    client = bigquery.Client()

    bucket_name = "bucket_name"
    object_name = event['name']
    table_id = "project_id.dataset_name.table_name"

    schema = [
        bigquery.SchemaField('col1', 'string'),
        bigquery.SchemaField('col2', 'string'),
    ]

    job_config = bigquery.LoadJobConfig()
    job_config.schema = schema
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.write_disposition = bigquery.WriteDisposition().WRITE_APPEND
    job_config.skip_leading_rows = 1

    uri = "gs://{}/{}".format(bucket_name, object_name)

    load_job = client.load_table_from_uri(uri,
                                          table_id,
                                          job_config=job_config)
    load_job.result()