0
votes

I have the sqlite .db files stored in Google Cloud Storage Bucket. These files are uploaded on daily bases. I want to stream / write specific table data within .db files to BigQuery table. I have tried to write a cloud function to connect with sqlite db and fetch the record in pandas dataframe and load to BQ table. My python program is not working since it is not able to connect to sqlite db on runtime and fetch the data from it.

My code:

import sqlite3
import pandas as pd
from google.cloud import bigquery
from google.cloud import storage

def bqDataLoad(event, context):
    storage_client = storage.Client() 
    bucketName = 'my-bucket'
    blobName = 'mysqlite.db'
    bucket = storage_client.get_bucket(bucketName) 
    blob = bucket.blob(blobName)
    fileName = "gs://" + bucketName + "/" + blobName
        
    bigqueryClient = bigquery.Client()
    tableRef = bigqueryClient.dataset("ds").table("bqtable")
    cnx = sqlite3.connect(fileName) 
    dataFrame = pd.read_sql_query("SELECT * FROM sqlitetable", cnx)

    bigqueryJob = bigqueryClient.load_table_from_dataframe(dataFrame, tableRef)
    bigqueryJob.result()

I tried passing sqlite3.connect(fileName) to "cnx" then cloud function throughs error of unable to connect. Also tried sqlite3.connect(blob) but it is giving error since string is expected.

Any help would be much appreciated.

Thanks

1

1 Answers

1
votes

I believe you are not able to get a connection to the database, since you'd need to download it to the instance running your Cloud Function. Notice that in the Cloud Functions environment the /tmp folder will be the only writeable directory and that since Cloud Functions is an in-memory system the RAM assigned to your Cloud Function at the time of deployment will be used to host the .db file.

The following code snippet should work (it is based on the famous chinook SQLite sample database).

It is assumed that you have already created the table within the BigQuery dataset with the following schema:

ArtistId    INTEGER   NULLABLE  
Name        STRING    NULLABLE  

based on the schema of the Cloud SQL database (as the specific query to be run depends on that) and that the .db object stored within your Cloud Storage bucket is not nested under any folders.

  1. Within your local development setup create a directory with the following files:

a. requirements.txt

google-cloud-storage
google-cloud-bigquery
pandas
pyarrow

b. main.py

from google.cloud import storage
from google.cloud import bigquery
import sqlite3
import pandas as pd

BUCKET_NAME = "[YOUR-BUCKET]" #Change as per your setup
OBJECT_NAME = "chinook.db" #Change as per your setup
DATABASE_NAME_IN_RUNTIME = "/tmp/chinook.db" #Remember that only the /tmp folder is writable within the directory
QUERY = "SELECT * FROM artists;" #Change as per your query
TABLE_ID = "[PROJECT-ID].[DATASET-ID].[TABLE-ID]" # Change with the format your-project.your_dataset.your_table_name

storage_client = storage.Client()
bigquery_client = bigquery.Client()

# Fetch the .db file from Cloud Storage
def get_db_file_from_gcs(bucket_name, object_name, filename):
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(object_name)
    return blob.download_to_filename(filename)

#Makes the connections to the DB
def connect_to_sqlite_db(complete_filepath_to_db):
    connection = sqlite3.connect(complete_filepath_to_db)
    return connection

#Run the query and return a cursor to iterate over the results
def run_query_to_db(connection, query):
    with connection:
        cursor = connection.cursor()
        cursor.execute(query)
        return cursor.fetchall()

#Run the query and saves it to a dataframe
def run_query_to_db_with_pandas(connection, query):
    with connection:
        df = pd.read_sql_query(query, connection)
        return df

def gcssqlite_to_bq(request):
    print("Getting .db file from Storage")
    get_db_file_from_gcs(BUCKET_NAME, OBJECT_NAME, DATABASE_NAME_IN_RUNTIME)
    print("Downloaded .db file in CF instance RAM")
    print("Trying to connect to database using sqlite")
    cnx = connect_to_sqlite_db(DATABASE_NAME_IN_RUNTIME)
    print("Connected to database")
    print("Attempting to perform a query")
    results = run_query_to_db_with_pandas(cnx, QUERY)
    print("Writing data to BigQuery")
    bigqueryJob = bigquery_client.load_table_from_dataframe(results, TABLE_ID)
    bigqueryJob.result()
    print("The Job to write to Big Query is finished")
    return "Executed Function"
  1. Update the main.py file as per your requirements.

  2. Use the following command to deploy a publicly invokable cloud function by changing the parameters as per your needs:

gcloud functions deploy [CLOUD_FUNCTION_NAME] --region [REGION] --entry-point gcssqlite_to_bq --timeout 540 --memory 1024MB --runtime python38 --trigger-http --allow-unauthenticated

DISCLAIMER: This approach is prone to get OOM errors depending on the size of your .db file and the complexity of the queries issued to the database. Notice that this are the memory limits offered by Cloud Functions (currently 4GB), and if you have a lot of data (in the order of GB or TB) on your .db files maybe a better approach would be to migrate your data to Cloud SQL (SQLite is not supported though) and use federated queries directly from BigQuery.