I'm using a python code snippet and deploying it using cloud function, the goal is to auto upload csv data from storage buckets to big query tables, the function trigger is 'whenever a new file is uploaded in the storage bucket'. However, the code crashes , please let me know if I'm doing something wrong.
import gcsfs
import os
import pandas as pd
import re
import numpy as np
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.exceptions import NotFound
# Environment variables
metric = "availability"
table = "availability_daily_2"
bucket = "tintin_bucket"
staging_folder = "profitero/staging/daily/"+metric
processed_folder = "profitero/processed/daily/"+metric
dataset = "tintin_2"
# Returns a list with all blobs in a given bucket
def list_blobs(bucket):
storage_client = storage.Client()
blobs = storage_client.list_blobs(bucket)
blobs_list = []
for blob in blobs:
blobs_list.append(blob.name)
return(blobs_list)
# Function to process file names into organized data
def processFileNames(list_of_file_names):
# Define helper functions
def searchFunction(pattern,x):
output = re.search(pattern,x)
if output is None:
return(None)
else:
return(output.group(0))
def getdates(x): return(searchFunction(r"(\d{4}-\d{2}-\d{2})",x))
def getcountry(x): return(searchFunction(r"([A-Z]{2})",x))
def getmetric(x): return(searchFunction(r"(Placement|Pricing|Ratings|Availability|Content|Assortment)",x))
def getfiletype(x): return(searchFunction(r"(zip|csv)",x))
def isDaily(x): return(searchFunction(r"(Daily)",x))
# Create empty dataframe
d = {'filename': list_of_file_names}
df = pd.DataFrame(data=d)
# Fill dataframe
df['date'] = df.filename.apply(lambda x: getdates(x) )
df['date'] = pd.to_datetime(df['date'])
df['country'] = df.filename.apply(lambda x: getcountry(x) )
df['metric'] = df.filename.apply(lambda x: getmetric(x) )
df['filetype'] = df.filename.apply(lambda x: getfiletype(x) )
df['isDaily'] = df.filename.apply(lambda x: isDaily(x) )
df.replace('',np.nan,inplace=True)
#df.dropna(inplace=True)
return(df)
def cleanCols(x):
#x = re.sub('[^0-9a-zA-Z]+', '', x)
x = x.replace(" ", "_")
#x = x.lower()
x = x.replace("-","_")
x = x.replace("#","no")
x = x.replace("3p","third_party")
x = x.replace("3P","third_party")
x = x.replace("&","and")
x = x.replace("'","")
return(x)
# Function to move processed blobs into processed folder
def move_blob(bucket, file):
storage_client = storage.Client()
source_bucket = storage_client.bucket(bucket)
source_blob = source_bucket.blob(file)
destination_bucket = storage_client.bucket(bucket)
destination_blob_name = "profitero/processed/daily/"+metric+"/"+file.rsplit("/",1)[1]
try:
blob_copy = source_bucket.copy_blob(source_blob, destination_bucket, destination_blob_name)
blob_delete = source_bucket.delete_blob(file)
print("Blob {} moved to blob {}.".format(source_blob.name,blob_copy.name))
except NotFound:
print("Not found error")
pass
# Main function - Lists CSVs in bucket, reads them into memory, loads them into BigQuery
def csv_loader(data,context):
#request_json = request.get_json(silent=True)
print(data['name'])
p = re.compile('profitero\/staging\/daily\/'+metric+'\/.*csv')
if p.match(data['name']):
try:
df = pd.read_csv("gs://"+bucket+"/"+data['name'])
print("Read CSV")
df['event_id'] = context.event_id
print("Attached event id")
df['event_timestamp'] = context.timestamp
print("Attached timestamp")
df.rename(columns=lambda x: cleanCols(x),inplace=True)
df['RPC'] = df['RPC'].astype(str)
print("Cleaned column names")
df = df[['Date', 'Country', 'Retailer', 'Product_Title', 'Match_Type', 'Availability', 'URL', 'Manufacturer', 'Brand', 'Sub_Brand', 'Account_Category','RPC']]
print("Selected relevant columns")
df.to_gbq("tintin_2."+table,if_exists="append",project_id="emea-devices-services")
print("Added to table")
move_blob(bucket,data['name'])
print("Moved file")
except Exception as e:
print(e)
else:
pass
# Notify of sucess
return("Sucess!")