It is the first time I am using a Cloud Function and this cloud function is just doing one single job: everytime a file is uploaded to a GCS bucket, the cloud function is running and copying that file (.csv
) to a BigQuery table without any transformations. What would be the most efficient wat to test (unit not integration) the gcs_to_bq
method?
def get_bq_table_name(file_name):
if re.search('car', file_name):
return 'car'
return 'bike'
def gcs_to_bq(event, context):
# Construct a BigQuery client object.
client = bigquery.Client()
bq_table = get_bq_table_name(event['name'] )
table_id = f'xxx.yyy.{bq_table}'
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("datetime", "STRING"),
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("id", "STRING"),
],
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://" + event['bucket'] + '/' + event['name']
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))