0
votes

I have written a cloud storage trigger based cloud function. I have 10-15 files landing at 5 secs interval in cloud bucket which loads data into a bigquery table(truncate and load).

While there are 10 files in the bucket I want cloud function to process them in sequential manner i.e 1 file at a time as all the files accesses the same table for operation.

Currently cloud function is getting triggered for multiple files at a time and it fails in BIgquery operation as multiple files trying to access the same table.

Is there any way to configure this in cloud function??

Thanks in Advance!

2
Does all the file write in the same table? If no, can you differentiate the destination table according with a file prefix or a different path in GCS? How many files do you have per day? - guillaume blaquiere
Yes, we have a single table to load all the files which is a truncate load table. No, I can't create multiple tables as they will be again pointing to the same final table. We receive max 30 files a day but it may vary - Riti
Do the files have a specific order? Or, do you perform a query after your truncate load? - guillaume blaquiere
NO there is no specic order of receiving or loading the files. Yes, we perform our query and transformation activity after staging load. - Riti
Why do you need to process them sequentially? That complicates cloud architecture, and limits its scalability. To better understand the issue, read this: cloud.google.com/pubsub/docs/ordering - Doug Stevenson

2 Answers

0
votes

You can achieve this by using pubsub, and the max instance param on Cloud Function.

EDIT

Thanks to your code, I understood what happens. In fact, BigQuery is a declarative system. When you perform a request or a load job, a job is created and it works in background.

In python, you can explicitly wait the end on the job, but, with pandas, I didn't find how!!

I just found a Google Cloud page to explain how to migrate from pandas to BigQuery client library. As you can see, there is a line at the end

# Wait for the load job to complete.
job.result()

than wait the end of the job.

You did it well in the _insert_into_bigquery_dwh function but it's not the case in the staging _insert_into_bigquery_staging one. This can lead to 2 issues:

  • The dwh function work on the old data because the staging isn't yet finish when you trigger this job
  • If the staging take, let's say, 10 seconds and run in "background" (you don't wait the end explicitly in your code) and the dwh take 1 seconds, the next file is processed at the end of the dwh function, even if the staging one continue to run in background. And that leads to your issue.
0
votes

The architecture you describe isn't the same as the one from the documentation you linked. Note that in the flow diagram and the code samples the storage events triggers the cloud function which will stream the data directly to the destination table. Since BigQuery allow for multiple streaming insert jobs several functions could be executed at the same time without problems. In your use case the intermediate table used to load with write-truncate for data cleaning makes a big difference because each execution needs the previous one to finish thus requiring a sequential processing approach.

I would like to point out that PubSub doesn't allow to configure the rate at which messages are sent, if 10 messages arrive to the topic they all will be sent to the subscriber, even if processed one at a time. Limiting the function to one instance may lead to overhead for the above reason and could increase latency as well. That said, since the expected workload is 15-30 files a day the above maybe isn't a big concern.

If you'd like to have parallel executions you may try creating a new table for each message and set a short expiration deadline for it using table.expires(exp_datetime) setter method so that multiple executions don't conflict with each other. Here is the related library reference. Otherwise the great answer from Guillaume would completely get the job done.