0
votes

I'm using a python code snippet and deploying it using cloud function, the goal is to auto upload csv data from storage buckets to big query tables, the function trigger is 'whenever a new file is uploaded in the storage bucket'. However, the code crashes , please let me know if I'm doing something wrong.

import gcsfs
import os
import pandas as pd
import re
import numpy as np
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.exceptions import NotFound


# Environment variables
metric = "availability"
table = "availability_daily_2" 

bucket = "tintin_bucket" 
staging_folder = "profitero/staging/daily/"+metric
processed_folder = "profitero/processed/daily/"+metric
dataset = "tintin_2"


# Returns a list with all blobs in a given bucket
def list_blobs(bucket):
    storage_client = storage.Client()
    blobs = storage_client.list_blobs(bucket)
    blobs_list = []
    for blob in blobs:
      blobs_list.append(blob.name)
    return(blobs_list)

# Function to process file names into organized data
def processFileNames(list_of_file_names):
    # Define helper functions
    def searchFunction(pattern,x): 
      output = re.search(pattern,x)
      if output is None:
        return(None)
      else:
        return(output.group(0))
    def getdates(x): return(searchFunction(r"(\d{4}-\d{2}-\d{2})",x))
    def getcountry(x): return(searchFunction(r"([A-Z]{2})",x))
    def getmetric(x): return(searchFunction(r"(Placement|Pricing|Ratings|Availability|Content|Assortment)",x)) 
    def getfiletype(x): return(searchFunction(r"(zip|csv)",x))
    def isDaily(x): return(searchFunction(r"(Daily)",x))
    # Create empty dataframe
    d = {'filename': list_of_file_names}
    df = pd.DataFrame(data=d)
    # Fill dataframe
    df['date'] = df.filename.apply(lambda x: getdates(x) )
    df['date'] = pd.to_datetime(df['date'])
    df['country'] = df.filename.apply(lambda x: getcountry(x) )
    df['metric'] = df.filename.apply(lambda x: getmetric(x) )
    df['filetype'] = df.filename.apply(lambda x: getfiletype(x) )
    df['isDaily'] = df.filename.apply(lambda x: isDaily(x) )
    df.replace('',np.nan,inplace=True)
    #df.dropna(inplace=True)
    return(df)

def cleanCols(x):
  #x = re.sub('[^0-9a-zA-Z]+', '', x)
  x = x.replace(" ", "_")
  #x = x.lower() 
  x = x.replace("-","_")
  x = x.replace("#","no")
  x = x.replace("3p","third_party")
  x = x.replace("3P","third_party")
  x = x.replace("&","and")
  x = x.replace("'","")
  return(x)

# Function to move processed blobs into processed folder
def move_blob(bucket, file):
    storage_client = storage.Client()
    source_bucket = storage_client.bucket(bucket)
    source_blob = source_bucket.blob(file)
    destination_bucket = storage_client.bucket(bucket)
    destination_blob_name = "profitero/processed/daily/"+metric+"/"+file.rsplit("/",1)[1]
    try:
        blob_copy = source_bucket.copy_blob(source_blob, destination_bucket, destination_blob_name)
        blob_delete = source_bucket.delete_blob(file)
        print("Blob {} moved to blob {}.".format(source_blob.name,blob_copy.name))
    except NotFound:
        print("Not found error")
        pass

# Main function - Lists CSVs in bucket, reads them into memory, loads them into BigQuery
def csv_loader(data,context):
    #request_json = request.get_json(silent=True)
    print(data['name'])
    p = re.compile('profitero\/staging\/daily\/'+metric+'\/.*csv')
    if p.match(data['name']):
        try: 
            df = pd.read_csv("gs://"+bucket+"/"+data['name'])
            print("Read CSV")
            df['event_id'] = context.event_id
            print("Attached event id")
            df['event_timestamp'] = context.timestamp
            print("Attached timestamp")
            df.rename(columns=lambda x: cleanCols(x),inplace=True)
            df['RPC'] = df['RPC'].astype(str)
            print("Cleaned column names")
            df = df[['Date', 'Country', 'Retailer', 'Product_Title', 'Match_Type', 'Availability', 'URL', 'Manufacturer', 'Brand', 'Sub_Brand', 'Account_Category','RPC']]
            print("Selected relevant columns")
            df.to_gbq("tintin_2."+table,if_exists="append",project_id="emea-devices-services")
            print("Added to table")
            move_blob(bucket,data['name'])
            print("Moved file")
        except Exception as e: 
            print(e)
    else:
        pass

    # Notify of sucess
    return("Sucess!")
2
When it crashes, what error message do you get? What do you see in the logs?Dustin Ingram

2 Answers

0
votes

The csv_loader function within your code and more specifically the pd.read_csv() method that reads the CSV into memory will be most likely the culprit of the Cloud Function to crash depending on the size of the CSV. This can be a memory intensive task and releasing the memory used by the dataframe can be a tricky task.

Depending on the size of the CSV files to be processed provision your Cloud Function with enough memory (default is 256MB and max is 2048MB) in order to make sure that your function does not run into OOM issues and crash.

Another alternative, in order to avoid such a bottleneck for your application would be to consider streaming the data from Cloud Storage into BigQuery as it is explained extensively in the following article. Find the relevant repository here.

0
votes

There are multiple solutions you can look into:

File Size < 50 MB: loading a text files (.txt) in cloud storage into big query table

File Size > 50 MB: Unable to load a 340 MB file into BigQuery using Cloud Function

Note: The first solution utilizing the compute power of Cloud Function, where as, the second solution utilizing BigQuery compute power.