2
votes

I have Pub/Sub subscribe logic wrapped inside a subscribe method that is being called once during service initialization for every subscription:

    def subscribe(self,
                  callback: typing.Callable,
                  subscription_name: str,
                  topic_name: str,
                  project_name: str = None) -> typing.Optional[SubscriberClient]:
        """Subscribes to Pub/Sub topic and return subscriber client

        :param callback: subscription callback method
        :param subscription_name: name of the subscription
        :param topic_name: name of the topic
        :param project_name: optional project name. Uses default project if not set
        :return: subscriber client or None if testing
        """
        project = project_name if project_name else self.pubsub_project_id
        self.logger.info('Subscribing to project `{}`, topic `{}`'.format(project, topic_name))

        project_path = self.pubsub_subscriber.project_path(project)
        topic_path = self.pubsub_subscriber.topic_path(project, topic_name)
        subscription_path = self.pubsub_subscriber.subscription_path(project, subscription_name)

        # check if there is an existing subscription, if not, create it
        if subscription_path not in [s.name for s in self.pubsub_subscriber.list_subscriptions(project_path)]:
            self.logger.info('Creating new subscription `{}`, topic `{}`'.format(subscription_name, topic_name))
            self.pubsub_subscriber.create_subscription(subscription_path, topic_path)

        # subscribe to the topic
        self.pubsub_subscriber.subscribe(
            subscription_path, callback=callback,
            scheduler=self.thread_scheduler
        )
        return self.pubsub_subscriber

This method is called like this:

        self.subscribe_client = self.subscribe(
            callback=self.pubsub_callback,
            subscription_name='subscription_topic',
            topic_name='topic'
        )

The callback method does a bunch of stuff, sends 2 emails then acknowledges the message

    def pubsub_callback(self, data: gcloud_pubsub_subscriber.Message):
        self.logger.debug('Processing pub sub message')

        try:
            self.do_something_with_message(data)

            self.logger.debug('Acknowledging the message')
            data.ack()
            self.logger.debug('Acknowledged')
            return

        except:
            self.logger.warning({
                "message": "Failed to process Pub/Sub message",
                "request_size": data.size,
                "data": data.data
            }, exc_info=True)

        self.logger.debug('Acknowledging the message 2')
        data.ack()

When I run push something to the subscription, callback runs, prints all the debug messages including Acknowledged. The message, however, stays in the Pub/Sub, the callback gets called again and it takes exponential time after each retry. The question is what could cause the message to stay in the pub/sub even after the ack is called?

I have several such subscriptions, all of them work as expected. Deadline is not an option, the callback finishes almost immediately and I played with the ack deadline anyways, nothing helped.

When I try to process these messages from locally running app connected to that pub-sub, it completes just fine and acknowledge takes the message out of the queue as expected.

  • So the problem manifests only in deployed service (running inside a kubernetes pod)
  • Callback executes buck ack does seemingly nothing
  • Acking messages from a script running locally (...and doing the exact same stuff) or through the GCP UI works as expected.

Any ideas?

2

2 Answers

3
votes

Acknowledgements are best-effort in Pub/Sub, so it's possible but unusual for messages to be redelivered.

If you are consistently receiving duplicates, it might be due to duplicate publishes of the same message contents. As far as Pub/Sub is concerned, these are different messages and will be assigned different message IDs. Check the Pub/Sub-provided message IDs to ensure that you are actually receiving the same message multiple times.

There is an edge case in dealing with large backlogs of small messages with streaming pull (which is what the Python client library uses). If you are running multiple clients subscribing on the same subscription, this edge case may be relevant.

You can also check your subscription's Stackdriver metrics to see:

  • if its acks are being sent successfully (subscription/ack_message_count)
  • if its backlog is decreasing (subscription/backlog_bytes)
  • if your subscriber is missing the ack deadline (subscription/streaming_pull_ack_message_operation_count filtered by response_code != "success")

If you're not missing the ack deadline and your backlog is remaining steady, you should contact Google Cloud support with your project name, subscription name, and a sample of the duplicate message IDs. They will be able to investigate why these duplicates are happening.

0
votes

I did some additional testing and I finally found the problem.

TL;DR: I was using the same google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler for all subscriptions.

Here are the snippets of the code I used to test it. This is the broken version:

server.py

import concurrent.futures.thread
import os
import time

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler


def create_subscription(project_id, topic_name, subscription_name):
    """Create a new pull subscription on the given topic."""
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = subscriber.topic_path(project_id, topic_name)
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    subscription = subscriber.create_subscription(
        subscription_path, topic_path)

    print('Subscription created: {}'.format(subscription))


def receive_messages(project_id, subscription_name, t_scheduler):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message.data))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback, scheduler=t_scheduler)
    print('Listening for messages on {}'.format(subscription_path))


project_id = os.getenv("PUBSUB_PROJECT_ID")

publisher = pubsub_v1.PublisherClient()
project_path = publisher.project_path(project_id)

# Create both topics
try:
    topics = [topic.name.split('/')[-1] for topic in publisher.list_topics(project_path)]
    if 'topic_a' not in topics:
        publisher.create_topic(publisher.topic_path(project_id, 'topic_a'))
    if 'topic_b' not in topics:
        publisher.create_topic(publisher.topic_path(project_id, 'topic_b'))
except AlreadyExists:
    print('Topics already exists')

# Create subscriptions on both topics
sub_client = pubsub_v1.SubscriberClient()
project_path = sub_client.project_path(project_id)

try:
    subs = [sub.name.split('/')[-1] for sub in sub_client.list_subscriptions(project_path)]
    if 'topic_a_sub' not in subs:
        create_subscription(project_id, 'topic_a', 'topic_a_sub')
    if 'topic_b_sub' not in subs:
        create_subscription(project_id, 'topic_b', 'topic_b_sub')
except AlreadyExists:
    print('Subscriptions already exists')

scheduler = ThreadScheduler(concurrent.futures.thread.ThreadPoolExecutor(10))

receive_messages(project_id, 'topic_a_sub', scheduler)
receive_messages(project_id, 'topic_b_sub', scheduler)

while True:
    time.sleep(60)

client.py

import datetime
import os
import random
import sys
from time import sleep

from google.cloud import pubsub_v1


def publish_messages(pid, topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(pid, topic_name)

    for n in range(1, 10):
        data = '[{} - {}] Message number {}'.format(datetime.datetime.now().isoformat(), topic_name, n)
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)
        sleep(random.randint(10, 50) / 10.0)


project_id = os.getenv("PUBSUB_PROJECT_ID")
publish_messages(project_id, sys.argv[1])

I connected to the cloud pub/sub, the server created topics and subscriptions. Then I ran the client script multiple times in parallel for both topics. After a short while, once I changed server code to instantiate new thread scheduler inside receive_messages scope, the server cleaned up both topics and functioned as expected.

Confusing thing is that in either case, the server printed out the received message for all the messages.

I am going to post this to https://github.com/googleapis/google-cloud-python/issues