0
votes

I'm using Google pubsublite. Small dummy topic with single partition and a few messages. Python client lib. Doing the standard SubscriberCluent.subscribe with callback. The callback places message in a queue. When the msg is taken out of the queue for consumption, its ack is called. When I want to stop, I call subscribe_future.cancel(); subscriber_future.result() and discard unconsumed messages in the queue.

Say I know the topic has 30 messages. I consume 10 of them before stopping. Then I restart a new SubscriberClient in the same subscription and receive messages. I expect to get starting with the 11th message, but I got starting with the first. So the precious subscriber has ack'd the first 10, but it's as if server did not receive the acknowledgement.

I thought maybe the ack needs some time to reach the server. So I waited 2 minutes before starting the second subscribe. Didn't help.

Then u thought maybe the subscriber object manages the ack calls, and I need to "flush" them before cancelling, but I found another about that.

What am I missing? Thanks.

Here's the code. If you have pubsublite account, the code is executable after you fill in credentials. The code shows two issues, one is the subject of this question; the other is asked at here


    # Using python 3.8
    
    from __future__ import annotations
    
    import logging
    import pickle
    import queue
    import time
    import uuid
    from concurrent.futures import ThreadPoolExecutor
    from contextlib import contextmanager
    from typing import Union, Optional
    
    from google.api_core.exceptions import AlreadyExists
    from google.cloud.pubsub_v1.types import BatchSettings
    from google.cloud.pubsublite import AdminClient, PubSubMessage
    from google.cloud.pubsublite import Reservation as GCPReservation
    from google.cloud.pubsublite import Subscription as GCPSubscription
    from google.cloud.pubsublite import Topic as GCPTopic
    from google.cloud.pubsublite.cloudpubsub import (PublisherClient,
                                                     SubscriberClient)
    from google.cloud.pubsublite.types import (BacklogLocation, CloudZone,
                                               LocationPath,
                                               ReservationPath, SubscriptionPath,
                                               TopicPath,
                                               )
    from google.cloud.pubsublite.types import FlowControlSettings
    from google.oauth2.service_account import Credentials
    
    
    logging.getLogger('google.cloud').setLevel(logging.WARNING)
    
    logger = logging.getLogger(__name__)
    
    FORMAT = '[%(asctime)s.%(msecs)03d %(name)s]  %(message)s'
    logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
    
    
    class Account:
        def __init__(self,
                     project_id: str,
                     region: str,
                     zone: str,
                     credentials: Credentials,
                     ):
            self.project_id = project_id
            self.region = region
            self.zone = CloudZone.parse(zone)
            self.credentials = credentials
            self.client = AdminClient(region=region, credentials=credentials)
    
        def location_path(self) -> LocationPath:
            return LocationPath(self.project_id, self.zone)
    
        def reservation_path(self, name: str) -> ReservationPath:
            return ReservationPath(self.project_id, self.region, name)
    
        def topic_path(self, name: str) -> TopicPath:
            return TopicPath(self.project_id, self.zone, name)
    
        def subscription_path(self, name: str) -> SubscriptionPath:
            return SubscriptionPath(self.project_id, self.zone, name)
    
        def create_reservation(self, name: str, *, capacity: int = 32) -> None:
            path = self.reservation_path(name)
            reservation = GCPReservation(name=str(path),
                                         throughput_capacity=capacity)
            self.client.create_reservation(reservation)
            # logger.info('reservation %s created', name)
    
        def create_topic(self,
                         name: str,
                         *,
                         partition_count: int = 1,
                         partition_size_gib: int = 30,
                         reservation_name: str = 'default') -> Topic:
            # A topic name can not be reused within one hour of deletion.
            top_path = self.topic_path(name)
            res_path = self.reservation_path(reservation_name)
    
            topic = GCPTopic(
                name=str(top_path),
                partition_config=GCPTopic.PartitionConfig(count=partition_count),
                retention_config=GCPTopic.RetentionConfig(
                    per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024),
                reservation_config=GCPTopic.ReservationConfig(
                    throughput_reservation=str(res_path)))
    
            self.client.create_topic(topic)
            # logger.info('topic %s created', name)
    
            return Topic(name, self)
    
        def delete_topic(self, name: str) -> None:
            path = self.topic_path(name)
            self.client.delete_topic(path)
            # logger.info('topic %s deleted', name)
    
        def get_topic(self, name: str) -> Topic:
            return Topic(name, self)
    
    
    class Topic:
    
        def __init__(self, name: str, account: Account):
            self.account = account
            self.name = name
            self._path = self.account.topic_path(name)
    
        def create_subscription(self,
                                name: str,
                                *,
                                pos: str = None) -> Subscription:
            path = self.account.subscription_path(name)
    
            if pos is None or pos == 'beginning':
                starting_offset = BacklogLocation.BEGINNING
            elif pos == 'end':
                starting_offset = BacklogLocation.END
            else:
                raise ValueError(
                    'Argument start only accepts one of two values - "beginning" or "end"'
                )
    
            Conf = GCPSubscription.DeliveryConfig
            subscription = GCPSubscription(
                name=str(path),
                topic=str(self._path),
                delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY))
    
            self.account.client.create_subscription(subscription, starting_offset)
            # logger.info('subscription %s created for topic %s', name, self.name)
    
            return Subscription(name, self)
    
        def delete_subscription(self, name: str) -> None:
            path = self.account.subscription_path(name)
            self.account.client.delete_subscription(path)
            # logger.info('subscription %s deleted from topic %s', name, self.name)
    
        def get_subscription(self, name: str):
            return Subscription(name, self)
    
        @contextmanager
        def get_publisher(self, **kwargs):
            with Publisher(self, **kwargs) as pub:
                yield pub
    
    
    class Publisher:
        def __init__(self, topic: Topic, *, batch_size: int = 100):
            self.topic = topic
            self._batch_config = {
                    'max_bytes': 3 * 1024 * 1024,  # 3 Mb; must be  None:
            self._messages.put(data)
    
    
    class Subscription:
    
        def __init__(self, name: str, topic: Topic):
            self.topic = topic
            self.name = name
            self._path = topic.account.subscription_path(name)
    
        @contextmanager
        def get_subscriber(self, *, backlog=None):
            with Subscriber(self, backlog=backlog) as sub:
                yield sub
    
    
    class Subscriber:
        def __init__(self, subscription: Subscription, backlog: int = None):
            self.subscription = subscription
            self._backlog = backlog or 100
            self._cancel_requested: bool = None
            self._messages: queue.Queue = None
            self._pool: ThreadPoolExecutor = None
            self._NOMORE = object()
            self._subscribe_task = None
    
        def __enter__(self):
            self._pool = ThreadPoolExecutor(1).__enter__()
            self._messages = queue.Queue(self._backlog)
            messages = self._messages
    
            def callback(msg: PubSubMessage):
                logger.info('got %s', pickle.loads(msg.data))
                messages.put(msg)
    
            def _subscribe():
                flowcontrol = FlowControlSettings(
                        messages_outstanding=self._backlog,
                        bytes_outstanding=1024 * 1024 * 10)
    
                subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials)
                with subscriber:
                    fut = subscriber.subscribe(self.subscription._path, callback, flowcontrol)
                    logger.info('subscribe sent to gcp')
    
                    while True:
                        if self._cancel_requested:
                            fut.cancel()
                            fut.result()
                            while True:
                                while not messages.empty():
                                    try:
                                        _ = messages.get_nowait()
                                    except queue.Empty:
                                        break
                                try:
                                    messages.put_nowait(self._NOMORE)
                                    break
                                except queue.Full:
                                    continue
                            break
                        time.sleep(0.003)
    
            self._subscribe_task = self._pool.submit(_subscribe)
            return self
    
        def __exit__(self, *args, **kwargs):
            if self._pool is not None:
                if self._subscribe_task is not None:
                    self._cancel_requested = True
                    while True:
                        z = self._messages.get()
                        if z is self._NOMORE:
                            break
                    self._subscribe_task.result()
                    self._subscribe_task = None
                    self._messages = None
                self._pool.__exit__(*args, **kwargs)
                self._pool = None
    
        def get(self, timeout=None):
            if timeout is not None and timeout == 0:
                msg = self._messages.get_nowait()
            else:
                msg = self._messages.get(block=True, timeout=timeout)
            data = pickle.loads(msg.data)
            msg.ack()
            return data
    
    
    def get_account() -> Account:
        return Account(project_id='--fill-in-proj-id--',
                       region='us-central1',
                       zone='us-central1-a',
                       credentials='--fill-in-creds--')
    
    
    # This test shows that it takes extremely long to get the first messsage
    # in `subscribe`.
    def test1(account):
        name = 'test-' + str(uuid.uuid4())
        topic = account.create_topic(name)
        try:
            with topic.get_publisher() as p:
                p.put(1)
                p.put(2)
                p.put(3)
    
            sub = topic.create_subscription(name)
            try:
                with sub.get_subscriber() as s:
                    t0 = time.time()
                    logger.info('getting the first message')
                    z = s.get()
                    t1 = time.time()
                    logger.info('  got the first message')
                    print(z)
                print('getting the first msg took', t1 - t0, 'seconds')
            finally:
                topic.delete_subscription(name)
        finally:
            account.delete_topic(name)
    
    
    def test2(account):
        name = 'test-' + str(uuid.uuid4())
        topic = account.create_topic(name)
        N = 30
        try:
            with topic.get_publisher(batch_size=1) as p:
                for i in range(N):
                    p.put(i)
    
            sub = topic.create_subscription(name)
            try:
                with sub.get_subscriber() as s:
                    for i in range(10):
                        z = s.get()
                        assert z == i
    
                # The following block shows that the subscriber
                # resets to the first message, not as expected
                # that it picks up where the last block left.
    
                with sub.get_subscriber() as s:
                    for i in range(10, 20):
                        z = s.get()
                        try:
                            assert z == i
                        except AssertionError as e:
                            print(z, '!=', i)
                            return
            finally:
                topic.delete_subscription(name)
        finally:
            account.delete_topic(name)
    
    
    if __name__ == '__main__':
        a = get_account()
        try:
            a.create_reservation('default')
        except AlreadyExists:
            pass
    
        test1(a)
        print('')
        test2(a)

1

1 Answers

0
votes

I was not able to recreate your issue but I think you should check the way its being handled on the official documentation about using cloud pubsublite.

This is the code I extract and update from Receiving messages sample and It works as intended, it will get the message from the lite-topic and acknowledge to avoid getting it again. if rerun, I will only get the data if there is data to pull. I added the code so you can check if something may differ from your code.

consumer.py

from concurrent.futures._base import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    SubscriptionPath,
    MessageMetadata, 
)
from google.cloud.pubsub_v1.types import PubsubMessage

# TODO(developer):
project_number = project-number
cloud_region = "us-central1"
zone_id = "a"
subscription_id = "sub-id"
timeout = 90

location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)

per_partition_flow_control_settings = FlowControlSettings(
    messages_outstanding=1000,
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.")
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

The only way I hit your scenario is when I use different subscriptions. But on that regard, when different subscriptions get message from the topic each one will receive the same stored messages as explained on Receiving messages from Lite subscriptions.

Consider this: