All,
I'm trying to learn how to use GCP PubSub, and I'm able to test it out via the CLI commands (create topics, subscriptions, publish to topic, pull from subscription, etc.), however when I jump over to python (v 2.7, current company standard) I am struggling with pulling the messages in a synchronous fashion.
I've reviewed this url, which tells you to do a sleep and While True, but I can't imagine anyone is doing that in the real world, right? https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-subscribe-python
This url tells you you can use future.result(), which I tried but it doesn't block the session/thread like you'd think: http://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/index.html
anyone have any other ideas? Here is my function, which is straight out of one of the examples pretty much:
def sample_receive_messages(subscription_name="my-sub", project=None):
"""Receives messages from a pull subscription."""
if not project:
credentials, project = google.auth.default()
subscriber = psub.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
# print('Received message: {}'.format(message))
message.ack()
print('<message>' + str(message.data) + '</message>')
subscription = subscriber.subscribe(subscription_path)
future = subscription.open(callback)
myResult = future.result()
subscription.close()
print("done")
My goal at the end of this is to have a process that wakes up every so often, grabs messages and ACKs them, writes the messages out to a file, then ends.
As of now, the process reads the messages and prints them out (great), but it then sits and sits and sits and finally errors out with some gibberish:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pubSubTools.py", line 50, in sample_receive_messages
myResult = future.result()
File "/usr/local/lib/python2.7/site-packages/google/cloud/pubsub_v1/futures.py", line 98, in result
raise err
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>