0
votes

I am trying to load cloud csv file data to big query table using cloud function in Python. My code is working for data with 10 rows, however after exceeding the rows to 1000, its throwing below error. Total columns 19, Total rows 1000.

The function is being called only once when someone uploads the csv file with the updated data in cloud storage. The frequency will be very less i.e. once in a month

Forbidden: 403 Exceeded rate limits: too many table update operations for this table.

Below is the code

def hello_gcs(event, context):
print('Event ID: {}'.format(context.event_id))
print('Event type: {}'.format(context.event_type))
print('Bucket: {}'.format(event['bucket']))
print('File: {}'.format(event['name']))
print('Metageneration: {}'.format(event['metageneration']))
print('Created: {}'.format(event['timeCreated']))
print('Updated: {}'.format(event['updated']))
if event['name'] == "DUMMY FLIGHT DETAIL LIST 18012021.csv":
   print('blob checking: {}'.format(event['name']))
   def inner_function():
       from google.cloud import bigquery
# Construct a BigQuery client object.
       client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
       table_id = "project-a-307309.DatasetA.TableE"
       load_job = client.delete_table(table_id, not_found_ok=True)  # Make an API request.
       print("Deleted table '{}'.".format(table_id))
       job_config = bigquery.LoadJobConfig(
          schema=[
             bigquery.SchemaField("ID", "STRING"),
             bigquery.SchemaField("STATUS", "STRING"),
             bigquery.SchemaField("PRIORITY", "STRING"),
             bigquery.SchemaField("BRAND", "STRING"),
             bigquery.SchemaField("FLIGHT_NAME", "STRING"),
             bigquery.SchemaField("FLIGHT_TYPE", "STRING"),
             bigquery.SchemaField("City", "STRING"),
             bigquery.SchemaField("Destination_Tier", "STRING"),
             bigquery.SchemaField("Country", "STRING"),
             bigquery.SchemaField("Management_Type", "STRING"),
             bigquery.SchemaField("Area", "STRING"),
             bigquery.SchemaField("Seat_Count", "STRING"),
             bigquery.SchemaField("Tier", "STRING"),
             bigquery.SchemaField("New_Segment", "STRING"),
             bigquery.SchemaField("Running_Date", "STRING"),
             bigquery.SchemaField("Expected_Opening", "STRING"),
             bigquery.SchemaField("New_Updated_Opening", "STRING"),
             bigquery.SchemaField("COMMENT", "STRING"),
             bigquery.SchemaField("URL", "STRING")
            ],
          skip_leading_rows=6,
    # The source format defaults to CSV, so the line below is optional.
        source_format=bigquery.SourceFormat.CSV,
        )
       uri = "gs://dummy_flight_details/DUMMY FLIGHT DETAIL LIST 18012021.csv"
       load_job = client.load_table_from_uri(
          uri, table_id, job_config=job_config
        )  # Make an API request.
       load_job.result()  # Waits for the job to complete.
       destination_table = client.get_table(table_id)  # Make an API request.
       print("Loaded {} rows.".format(destination_table.num_rows))
   inner_function()
   def outer_func():
    from google.cloud import storage
    import time
    ts = time.time()
    print("ts")
    print(ts)
    bucket_name = "dummy_flight_details"
    blob_name = "DUMMY FLIGHT DETAIL LIST 18012021.csv"
    new_name = "DUMMY FLIGHT DETAIL LIST 18012021.csv".format(ts)

    def rename_blob():
        print("function entered")
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        new_blob = bucket.rename_blob(blob, new_name)
        print("Blob {} has been renamed to {}".format(blob.name, new_blob.name))

    rename_blob()

   outer_func()

Requirement Text

# Function dependencies, for example:
# package>=version
google-cloud-bigquery==2.11.0
google-cloud-storage==1.35.0

Log

Traceback (most recent call last): File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app response = self.full_dispatch_request() File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request rv = self.handle_user_exception(e) File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception reraise(exc_type, exc_value, tb) File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise raise value File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request rv = self.dispatch_request() File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/layers/google.python.pip/pip/lib/python3.8/site-packages/functions_framework/__init__.py", line 149, in view_func function(data, context) File "/workspace/main.py", line 52, in hello_gcs inner_function() File "/workspace/main.py", line 49, in inner_function load_job.result() # Waits for the job to complete. File "/layers/google.python.pip/pip/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 662, in result return super(_AsyncJob, self).result(timeout=timeout, **kwargs) File "/layers/google.python.pip/pip/lib/python3.8/site-packages/google/api_core/future/polling.py", line 134, in result raise self._exception google.api_core.exceptions.Forbidden: 403 Exceeded rate limits: too many table update operations for this table. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors
2
I believe that @guillaume has the right answer. To confirm, review the BigQuery logs to see what jobs are created.John Hanley

2 Answers

2
votes

As far as I understand your code, you have created an infinite loop. And thus you reach the limit of calls.

Have a closer look to your code. Firstly, you said "The function is trigger every time some put a file in the bucket". Ok, no worries with that.

BUT, at the end you performed that

new_blob = bucket.rename_blob(blob, new_name)

Rename doesn't exist, it's only a copy of the old blob to the new one, and a delete of the older one (if the old name is different of the new one).

Thus a copy is the creation of a new blob, and thus GCS fires a new event to run your function.

And because you have a bug in your code, the new blob name is the same as the initial one

new_name = "DUMMY FLIGHT DETAIL LIST 18012021.csv".format(ts)

Braces are missing in the string, the format do nothing there!


Same name, that pass your IF, same file name created ("rename") in loop, and thus events fired in loop also.

No relation with the file size/number of rows.

0
votes

You may like to check this page: BigQuery - Quotas and Limits

And check how many times and how frequently your cloud function is called...