0
votes

We're using PubSub in prod and seeing a problem that there are more VMs handling PubSub messages that we would expect to have.

I’ve run simple tests using PubSub overnight and it appears that something goes not so smooth as we've expected with the rate limiting mechanism.

Here is the test:

  1. Publish some amount of messages into a topic with Pull Subscription. In the experiment, there are about 2,7k messages (started approx at 9pm)
  2. Configure one async client using the StreamingPull connection and FlowControl set to 2.
  3. Simulate that handling of every incoming message takes 5 seconds via moving the execution into a timer and acknowledging the message only when the timer finishes.

Expected results: Messages from PubSub are consumed with the same speed, getting 2 messages at a time every 5 seconds. A small timeout between acking a message and a new message pulled due to all the network and processing expenses is expected.

Actual result: PubSub starts throttling, or something like this, with a huge timeout. No message arrives at that time. The timeout depends on amount of unacked messages in subscription.

It doesn't seem clear from the FlowControl docs.

PubSub subscription unacked message count

Here is the code of consumer (client):

var concurrentFlowsNumber = config.getLong(CONFIG_NUMBER_OF_THREADS);
    var flowSettings = FlowControlSettings.newBuilder()
      .setMaxOutstandingElementCount(concurrentFlowsNumber)
      .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
      .build();

    var subscriber = Subscriber.newBuilder(subscriptionName, receiver)
      .setCredentialsProvider(() -> serviceAccountCredentials)
      .setFlowControlSettings(flowSettings)
      .build();

    subscriber.addListener(
      new Subscriber.Listener() {
        @Override
        public void failed(ApiService.State from, Throwable failure) {
          logger.error(failure);
        }
      },
      MoreExecutors.directExecutor());

    var apiService = subscriber.startAsync();
    apiService.addListener(new ApiService.Listener() {
      @Override
      public void running() {
         logger.info("Pubsub started");
      }

      @Override
      public void failed(ApiService.State from, Throwable failure) {
        logger.error("Pubsub failed on step: {}", from);
      }
    }, Runnable::run);

And the message handler is:

private static void handlePubSubMessage(PubsubMessage message, AckReplyConsumer consumer) {
    new Timer().schedule(new TimerTask() {
      @Override
      public void run() {
               consumer.ack();
      }
    }, (long) 3000 + rand.nextInt(5000));
  }

So, does anyone have any idea how to make the clients (many vms) consume messages with concurrent handling limitations (up to 4 concurrent messages) without breaking for timeouts?

P.s. These questions are similar, but not the same: Google pubsub flow control pubsub Dynamic rate limiting Cloud pubsub slow poll rate

1

1 Answers

0
votes

Since you have a backlog build up, you might be running into this issue: https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages

Your undelivered messages will get buffered between the Pub/Sub service and the client library. Messages might get stuck in a single client's buffer, or get redelivered to the same client if the ackDeadline was exceeded.

You can experiment with using the synchronous pull as suggested.