1
votes

Problem: My use case is I want to receive messages from Google Cloud Pub/Sub - one message at a time using the Python Api. All the current examples mention using Async/callback option for pulling the messages from a Pub/Sub subscription. The problem with that approach is I need to keep the thread alive.

Is it possible to just receive 1 message and close the connection i.e. is there a feature where I can just set a parameter (something like a max_messages) to 1 so that once it receives 1 message the thread terminates?

The documentation here doesn't list anything for Python Synchronous pull which seem to have num_of_messages option for other languages like Java.

2

2 Answers

2
votes

See the following example in this link:

from google.cloud import pubsub_v1

client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
max_messages = 1

response = client.pull(subscription, max_messages)
print(response)

I've tried myself and using that you get one message at a time.

If you get some error try updating pubsub library to the last version:

pip install --upgrade google-cloud-pubsub

In docs here you have more info about the pull method used in the code snippet:

The Pull method relies on a request/response model:

The application sends a request for messages. The server replies with zero or more messages and closes the connection.

0
votes

As per the official documentation here:

...you can achieve exactly once processing of Pub/Sub message streams, as PubsubIO de-duplicates messages based on custom message identifiers or identifiers assigned by Pub/Sub.

So you should be able to use record IDs, i.e. identifiers for you messages, to allow for exactly-once processing across the boundary between Dataflow and other systems. To use record IDs, you invoke idLabel when constructing PubsubIO.Read or PubsubIO.Write transforms, passing a string value of your choice. In java this would be:

public PubsubIO.Read.Bound<T> idLabel(String idLabel)

This returns a transform that's like this one but that reads unique message IDs from the given message attribute.