I'm using Google pubsublite. Small dummy topic with single partition and a few messages. Python client lib. Doing the standard SubscriberCluent.subscribe
with callback. The callback places message in a queue. When the msg is taken out of the queue for consumption, its ack
is called. When I want to stop, I call subscribe_future.cancel(); subscriber_future.result()
and discard unconsumed messages in the queue.
Say I know the topic has 30 messages. I consume 10 of them before stopping. Then I restart a new SubscriberClient
in the same subscription and receive messages. I expect to get starting with the 11th message, but I got starting with the first. So the precious subscriber has ack'd the first 10, but it's as if server did not receive the acknowledgement.
I thought maybe the ack needs some time to reach the server. So I waited 2 minutes before starting the second subscribe. Didn't help.
Then u thought maybe the subscriber object manages the ack calls, and I need to "flush" them before cancelling, but I found another about that.
What am I missing? Thanks.
Here's the code. If you have pubsublite account, the code is executable after you fill in credentials. The code shows two issues, one is the subject of this question; the other is asked at here
# Using python 3.8 from __future__ import annotations import logging import pickle import queue import time import uuid from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from typing import Union, Optional from google.api_core.exceptions import AlreadyExists from google.cloud.pubsub_v1.types import BatchSettings from google.cloud.pubsublite import AdminClient, PubSubMessage from google.cloud.pubsublite import Reservation as GCPReservation from google.cloud.pubsublite import Subscription as GCPSubscription from google.cloud.pubsublite import Topic as GCPTopic from google.cloud.pubsublite.cloudpubsub import (PublisherClient, SubscriberClient) from google.cloud.pubsublite.types import (BacklogLocation, CloudZone, LocationPath, ReservationPath, SubscriptionPath, TopicPath, ) from google.cloud.pubsublite.types import FlowControlSettings from google.oauth2.service_account import Credentials logging.getLogger('google.cloud').setLevel(logging.WARNING) logger = logging.getLogger(__name__) FORMAT = '[%(asctime)s.%(msecs)03d %(name)s] %(message)s' logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') class Account: def __init__(self, project_id: str, region: str, zone: str, credentials: Credentials, ): self.project_id = project_id self.region = region self.zone = CloudZone.parse(zone) self.credentials = credentials self.client = AdminClient(region=region, credentials=credentials) def location_path(self) -> LocationPath: return LocationPath(self.project_id, self.zone) def reservation_path(self, name: str) -> ReservationPath: return ReservationPath(self.project_id, self.region, name) def topic_path(self, name: str) -> TopicPath: return TopicPath(self.project_id, self.zone, name) def subscription_path(self, name: str) -> SubscriptionPath: return SubscriptionPath(self.project_id, self.zone, name) def create_reservation(self, name: str, *, capacity: int = 32) -> None: path = self.reservation_path(name) reservation = GCPReservation(name=str(path), throughput_capacity=capacity) self.client.create_reservation(reservation) # logger.info('reservation %s created', name) def create_topic(self, name: str, *, partition_count: int = 1, partition_size_gib: int = 30, reservation_name: str = 'default') -> Topic: # A topic name can not be reused within one hour of deletion. top_path = self.topic_path(name) res_path = self.reservation_path(reservation_name) topic = GCPTopic( name=str(top_path), partition_config=GCPTopic.PartitionConfig(count=partition_count), retention_config=GCPTopic.RetentionConfig( per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024), reservation_config=GCPTopic.ReservationConfig( throughput_reservation=str(res_path))) self.client.create_topic(topic) # logger.info('topic %s created', name) return Topic(name, self) def delete_topic(self, name: str) -> None: path = self.topic_path(name) self.client.delete_topic(path) # logger.info('topic %s deleted', name) def get_topic(self, name: str) -> Topic: return Topic(name, self) class Topic: def __init__(self, name: str, account: Account): self.account = account self.name = name self._path = self.account.topic_path(name) def create_subscription(self, name: str, *, pos: str = None) -> Subscription: path = self.account.subscription_path(name) if pos is None or pos == 'beginning': starting_offset = BacklogLocation.BEGINNING elif pos == 'end': starting_offset = BacklogLocation.END else: raise ValueError( 'Argument start only accepts one of two values - "beginning" or "end"' ) Conf = GCPSubscription.DeliveryConfig subscription = GCPSubscription( name=str(path), topic=str(self._path), delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY)) self.account.client.create_subscription(subscription, starting_offset) # logger.info('subscription %s created for topic %s', name, self.name) return Subscription(name, self) def delete_subscription(self, name: str) -> None: path = self.account.subscription_path(name) self.account.client.delete_subscription(path) # logger.info('subscription %s deleted from topic %s', name, self.name) def get_subscription(self, name: str): return Subscription(name, self) @contextmanager def get_publisher(self, **kwargs): with Publisher(self, **kwargs) as pub: yield pub class Publisher: def __init__(self, topic: Topic, *, batch_size: int = 100): self.topic = topic self._batch_config = { 'max_bytes': 3 * 1024 * 1024, # 3 Mb; must be None: self._messages.put(data) class Subscription: def __init__(self, name: str, topic: Topic): self.topic = topic self.name = name self._path = topic.account.subscription_path(name) @contextmanager def get_subscriber(self, *, backlog=None): with Subscriber(self, backlog=backlog) as sub: yield sub class Subscriber: def __init__(self, subscription: Subscription, backlog: int = None): self.subscription = subscription self._backlog = backlog or 100 self._cancel_requested: bool = None self._messages: queue.Queue = None self._pool: ThreadPoolExecutor = None self._NOMORE = object() self._subscribe_task = None def __enter__(self): self._pool = ThreadPoolExecutor(1).__enter__() self._messages = queue.Queue(self._backlog) messages = self._messages def callback(msg: PubSubMessage): logger.info('got %s', pickle.loads(msg.data)) messages.put(msg) def _subscribe(): flowcontrol = FlowControlSettings( messages_outstanding=self._backlog, bytes_outstanding=1024 * 1024 * 10) subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials) with subscriber: fut = subscriber.subscribe(self.subscription._path, callback, flowcontrol) logger.info('subscribe sent to gcp') while True: if self._cancel_requested: fut.cancel() fut.result() while True: while not messages.empty(): try: _ = messages.get_nowait() except queue.Empty: break try: messages.put_nowait(self._NOMORE) break except queue.Full: continue break time.sleep(0.003) self._subscribe_task = self._pool.submit(_subscribe) return self def __exit__(self, *args, **kwargs): if self._pool is not None: if self._subscribe_task is not None: self._cancel_requested = True while True: z = self._messages.get() if z is self._NOMORE: break self._subscribe_task.result() self._subscribe_task = None self._messages = None self._pool.__exit__(*args, **kwargs) self._pool = None def get(self, timeout=None): if timeout is not None and timeout == 0: msg = self._messages.get_nowait() else: msg = self._messages.get(block=True, timeout=timeout) data = pickle.loads(msg.data) msg.ack() return data def get_account() -> Account: return Account(project_id='--fill-in-proj-id--', region='us-central1', zone='us-central1-a', credentials='--fill-in-creds--') # This test shows that it takes extremely long to get the first messsage # in `subscribe`. def test1(account): name = 'test-' + str(uuid.uuid4()) topic = account.create_topic(name) try: with topic.get_publisher() as p: p.put(1) p.put(2) p.put(3) sub = topic.create_subscription(name) try: with sub.get_subscriber() as s: t0 = time.time() logger.info('getting the first message') z = s.get() t1 = time.time() logger.info(' got the first message') print(z) print('getting the first msg took', t1 - t0, 'seconds') finally: topic.delete_subscription(name) finally: account.delete_topic(name) def test2(account): name = 'test-' + str(uuid.uuid4()) topic = account.create_topic(name) N = 30 try: with topic.get_publisher(batch_size=1) as p: for i in range(N): p.put(i) sub = topic.create_subscription(name) try: with sub.get_subscriber() as s: for i in range(10): z = s.get() assert z == i # The following block shows that the subscriber # resets to the first message, not as expected # that it picks up where the last block left. with sub.get_subscriber() as s: for i in range(10, 20): z = s.get() try: assert z == i except AssertionError as e: print(z, '!=', i) return finally: topic.delete_subscription(name) finally: account.delete_topic(name) if __name__ == '__main__': a = get_account() try: a.create_reservation('default') except AlreadyExists: pass test1(a) print('') test2(a)