3
votes

All,

I'm trying to learn how to use GCP PubSub, and I'm able to test it out via the CLI commands (create topics, subscriptions, publish to topic, pull from subscription, etc.), however when I jump over to python (v 2.7, current company standard) I am struggling with pulling the messages in a synchronous fashion.

I've reviewed this url, which tells you to do a sleep and While True, but I can't imagine anyone is doing that in the real world, right? https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-subscribe-python

This url tells you you can use future.result(), which I tried but it doesn't block the session/thread like you'd think: http://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/index.html

anyone have any other ideas? Here is my function, which is straight out of one of the examples pretty much:

def sample_receive_messages(subscription_name="my-sub", project=None):
"""Receives messages from a pull subscription."""
if not project:
    credentials, project = google.auth.default()  

subscriber = psub.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

def callback(message):
    # print('Received message: {}'.format(message))
    message.ack()
    print('<message>' + str(message.data) + '</message>')

subscription = subscriber.subscribe(subscription_path)

future = subscription.open(callback)

myResult = future.result()

subscription.close()

print("done")

My goal at the end of this is to have a process that wakes up every so often, grabs messages and ACKs them, writes the messages out to a file, then ends.

As of now, the process reads the messages and prints them out (great), but it then sits and sits and sits and finally errors out with some gibberish:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pubSubTools.py", line 50, in sample_receive_messages
    myResult = future.result()
  File "/usr/local/lib/python2.7/site-packages/google/cloud/pubsub_v1/futures.py", line 98, in result
    raise err
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
2
The other clients (Ruby, PHP, and Command Line Interface) don't appear to have to be told to not wait, which is what I wish the python option did.Rich Murnane

2 Answers

3
votes

This is described in the library documentation:

from google.cloud import SubscriberClient

pubsub_client = SubscriberClient()
subscription_path = 'projects/{project}/subscriptions/{subscription}'.format(project=project_name, subscription=subscription_name)
pull_response= pubsub_client.pull(subscription=subscription_path, max_messages=10)

for msg in pull_response.received_messages:
    message = msg.message.data.decode('utf-8')
    # do your thing
    pubsub_client.acknowledge(subscription_path, [msg.ack_id])
0
votes

The Python library does not explicitly support synchronous pulling in Cloud Pub/Sub API. future.result() is the recommended way of blocking, but it still pulls asynchronously.

I recommend you to try using the official Python Queue Class where the callback invokes Queue.put() and then consumes the messages with Queue.get().