1
votes

There seems to be an issue in google-cloud-pubsub==0.39.1 google-api-python-client==1.7.8

In which doing a pull in a loop when the credentials goes bad causes a flood of python3 1456 root 71u IPv4 46501 0t0 TCP XXX-XXXXX-XXXX:47074->YYYYYYYY-YY-YYYYY.1e100.net:https (CLOSE_WAIT)files to remain open and eventually cause a "too many files open" issue.

The problem does not appear to be in pubsub itself but in gRPC.

May 8 22:34:41 .sh[17736]: Traceback (most recent call last): May 8 22:34:41 .sh[17736]: File "/opt///lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable May 8 22:34:41 .sh[17736]: return callable_(*args, **kwargs) May 8 22:34:41 .sh[17736]: File "/opt///lib/python3.6/site-packages/grpc/_channel.py", line 562, in call May 8 22:34:41 .sh[17736]: return _end_unary_response_blocking(state, call, False, None) May 8 22:34:41 .sh[17736]: File "/opt///lib/python3.6/site-packages/grpc/_channel.py", line 466, in _end_unary_response_blocking May 8 22:34:41 .sh[17736]: raise _Rendezvous(state, None, None, deadline) May 8 22:34:41 .sh[17736]: grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: May 8 22:34:41 .sh[17736]: #011status = StatusCode.UNAVAILABLE May 8 22:34:41 .sh[17736]: #011details = "Getting metadata from plugin failed with error: ('invalid_grant: Invalid JWT Signature.', '{\n "error": "invalid_grant",\n "error_description": "Invalid JWT Signature."\n}')" May 8 22:34:41 .sh[17736]: #011debug_error_string = "{"created":"@1557354881.258250528","description":"Getting metadata from plugin failed with error: ('invalid_grant: Invalid JWT Signature.', '{\n "error": "invalid_grant",\n "error_description": "Invalid JWT Signature."\n}')","file":"src/core/lib/security/credentials/plugin/plugin_credentials.cc","file_line":79,"grpc_status":14}" May 8 22:34:41 .sh[17736]: > May 8 22:34:41 .sh[17736]: The above exception was the direct cause of the following exception: May 8 22:34:41 .sh[17736]: Traceback (most recent call last): May 8 22:34:41 .sh[17736]: File "/opt///lib/python3.6/site-packages/lt/cloud/cloudpull.py", line 113, in subscribeToStuff May 8 22:34:41 .sh[17736]: pull_response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES, timeout=60, retry=None) May 8 22:34:41 .sh[17736]: File "/opt///lib/python3.6/site-packages/google/cloud/pubsub_v1/_gapic.py", line 45, in May 8 22:34:41 .sh[17736]: fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw) # noqa May 8 22:34:41 .sh[17736]: File "/opt///lib/python3.6/site-packages/google/cloud/pubsub_v1/gapic/subscriber_client.py", line 860, in pull May 8 22:34:41 .sh[17736]: request, retry=retry, timeout=timeout, metadata=metadata May 8 22:34:41 .sh[17736]: File "/opt///lib/python3.6/site-packages/google/api_core/gapic_v1/method.py", line 143, in call May 8 22:34:41 .sh[17736]: return wrapped_func(*args, **kwargs) May 8 22:34:41 .sh[17736]: File "/opt///lib/python3.6/site-packages/google/api_core/timeout.py", line 102, in func_with_timeout May 8 22:34:41 .sh[17736]: return func(*args, **kwargs) May 8 22:34:41 .sh[17736]: File "/opt///lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable May 8 22:34:41 .sh[17736]: six.raise_from(exceptions.from_grpc_error(exc), exc) May 8 22:34:41 .sh[17736]: File "", line 3, in raise_from May 8 22:34:41 .sh[17736]: google.api_core.exceptions.ServiceUnavailable: 503 Getting metadata from plugin failed with error: ('invalid_grant: Invalid JWT Signature.', '{\n "error": "invalid_grant",\n "error_description": "Invalid JWT Signature."\n}')

1

1 Answers

1
votes

https://github.com/googleapis/google-cloud-python/issues/5523

Notice at the bottom client.api.transport._channel.close()

A pubsub_v1.SubscriberClient falls into the client category.

So you can do like...

subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
try:
    subscription_path = subscriber.subscription_path(project, subscription)
    pull_response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES, timeout=60, retry=None)
    for msg in pull_response.received_messages:
        #do stuff
finally:
    subscriber.api.transport._channel.close()

You can do this in a loop and keep your process from opening too many files :)