0
votes

I'm trying to set up two different google pub/sub subscribers to different subscriptions but in the same code. To paint a better picture say I have topic1 and topic2. Then I have subscription1 which is subscribed to topic1 and subscription2 which is subscribed to topic2. Then I have subscriber1 which is linked to subscription1 and subscriber2 which is linked to subscription2. My question is how can I use subscriber1 and subscriber2 in the same application. My example for just 1 subscriber is (from documentation):

project_id = "my-project-id"
subscription_id = "subscription1"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
logging.info("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result()
    except TimeoutError:
        streaming_pull_future.cancel()

How can I add subscription2 into this so that my python application can get messages from both topic1 and topic2? I couldn't find it in the docs but if I'm just missing it somehow let me know!

1

1 Answers

2
votes

If you want to receive messages from two subscriptions at the same time, you create two SubscriberClient instances, one for each subscription. To combine the futures, you could use an Event:

project_id = "my-project-id"
subscription_id1 = "subscription1"
subscription_id2 = "subscription2"

subscriber1 = pubsub_v1.SubscriberClient()
subscriber2 = pubsub_v1.SubscriberClient()
subscription_path1 = subscriber.subscription_path(project_id, subscription_id1)
subscription_path2 = subscriber.subscription_path(project_id, subscription_id2)
streaming_pull_future1 = subscriber1.subscribe(subscription_path1, callback=callback)
logging.info("Listening for messages on {}.".format(subscription_path1))
streaming_pull_future2 = subscriber2.subscribe(subscription_path2, callback=callback)
logging.info("Listening for messages on {}.".format(subscription_path2))

subscriber_shutdown = threading.Event()
streaming_pull_future1.add_done_callback(lambda result: subscriber_shutdown.set())
streaming_pull_future2.add_done_callback(lambda result: subscriber_shutdown.set())

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber1, subscriber2:
  subscriber_shutdown.wait()
  streaming_pull_future1.cancel()
  streaming_pull_future2.cancel()