5
votes

In my scenario I'm scheduling tasks using PubSub. This is up to 2.000 PubSub messages that are than consumed by a Python script that runs inside a Docker Container within Google Compute Engine. That script consumes the PubSub messages.

The processing of each message is about 30seconds to 5min. Therefore the acknowledgement deadline is 600sec (10min).

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.message import Message

def handle_message(message: Message):
    # do your stuff here (max. 600sec)
    message.ack()
    return

def receive_messages(project, subscription_name):

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_name)

    flow_control = pubsub_v1.types.FlowControl(max_messages=5)
    subscription = subscriber.subscribe(subscription_path, flow_control=flow_control)

    future = subscription.open(handle_message)

    # Blocks the thread while messages are coming in through the stream. Any
    # exceptions that crop up on the thread will be set on the future.
    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    try:
        future.result()
    except Exception as e:
        # do some logging
        raise

Because I'm working on so many PubSub messages, I'm creating a template for a Compute Engine that uses auto-scaling in either of these two ways:

gcloud compute instance-groups managed create my-worker-group \
  --zone=europe-west3-a \
  --template=my-worker-template \
  --size=0

gcloud beta compute instance-groups managed set-autoscaling my-worker-group \
  --zone=europe-west3-a \
  --max-num-replicas=50 \
  --min-num-replicas=0 \
  --target-cpu-utilization=0.4

gcloud beta compute instance-groups managed set-autoscaling my-worker-group \
  --zone=europe-west3-a \
  --max-num-replicas=50 \
  --min-num-replicas=0 \
  --update-stackdriver-metric=pubsub.googleapis.com/subscription/num_undelivered_messages \
  --stackdriver-metric-filter="resource.type = pubsub_subscription AND resource.label.subscription_id = my-pubsub-subscription" \
  --stackdriver-metric-single-instance-assignment=10

So far, so good. Option one scales up to about 8 instances while the second option will start the maximum number of instances. Now I figured out that some strange things happen and this is why I'm posting here. Maybe you can help me out?!

Message duplicates: It seems that the PubSub service in each instance (Python script inside the docker container within compute engine) reads a batch of messages (~10) in somewhat like a buffer and gives them to my code. Looks like all instances that spin up at the same time will read all the same messages (the first 10 of 2.000) and will start working on the same stuff. In my logs I see that most messages are processed 3 times by different machines. I was expecting that PubSub knows if some subscriber buffered 10 messages so that another subscriber will buffer 10 different messages and not the same ones.

Acknowledgement deadline: Because of the buffering the messages that come in the end of the buffer (let's say message 8 or 9) had to wait in the buffer until the preceding messages (messages 1 to 7) have been processed. The sum of that waiting time plus its own processing time may run into the timeout of 600sec.

Load-Balancing: Because each machine buffers so many messages, the load is consumed by just a few instances while other instances are completely idle. This happens for the scaling-option two that uses the PubSub stackdriver metric.

People told me that I need to implement a manual synchronization service using Cloud SQL or something else in which each instance indicates on which message it is working, so that other instances won't start the same. But I feel that can't be true - because then I don't get the idea what PubSub is all about.

pubsub behavior

Update: I found a nice explanation by Gregor Hohpe, co-author of the book Enterprise Integration Patterns from 2015. Actually my observation was wrong, but the observed side effects are real.

Google Cloud Pub/Sub API actually implements both the Publish-Subscribe Channel and the Competing Consumers patterns. At the heart of the Cloud Pub/Sub is a classic Publish-Subscribe Channel, which delivers a single message published to it to multiple subscribers. One advantage of this pattern is that adding subscribers is side-effect free, which is one reason a Publish-Subscribe Channel is sometimes considered more loosely coupled than a Point-to-Point Channel, which delivers a message to exactly one subscriber. Adding consumers to a Point-to-Point Channel results in Competing Consumers and thus has a strong side effect.

Copyright: Gregor Hohpe, co-author of the book Enterprise Integration Patterns. 2015.

The side effects I observed are about the message buffering and message flow control in each of the subscribers (who subscribed to the same subscription, point-to-point == competing consumers). The current version of the Python Client Lib wraps the PubSub REST API (and RPCs). If that wrapper is used, there is no control on:

  • How many containers are started on one VM; Multiple containers may be started if the CPU is not yet fully utilized
  • How many messages are pulled from the subscription at once (buffering); no control at all
  • How many threads, for processing the pulled messages, are started inside on container; flow_control(max_messages) has no effect if the value is below a fixed value.

The side effects we observed are:

  1. One consumer pulls a high number of messages at once (approximately 100 to 1.000) and queues them in its client buffer. Therefore all other VM's that are started according to the auto-scaling rule, do not receive any message, because all messages are in the queue of the first few VM's
  2. Messages are re-delivered either to the same VM or any other VM (or docker container) if it runs into the acknowledgement deadline. Therefore you need to modify the acknowledgement deadline while processing the message. The deadline counter starts when the processing starts.
  3. Assuming that the processing of the message is a long running task (for instance machine learning), you may
    • Acknowledge the message upfront, but this will cause the VM to be shut down by the auto-scaling rule if there is no further message waiting. The rule does not care if the CPU utilization is still strong and the processing has not yet been finished.
    • Acknowledge the message after processing. In this case you need to modify the acknowledgement deadline of that specific message while processing that message. There must not be one single code block that breaks the deadline since the last modification.

Possible solutions that haven't been looked into:

  • Using the Java Client Library since it comes with better controls on pulling and consuming messages
  • Using the underlying API calls and classes of the Python Client Library
  • Building a synchronization storage that orchestrates the competing consumers
2
I am not sure why you are handling the message pull calling the open() method instead of using subscriber.subscribe() as the only future, passing to it the callback function which you want to call with each message. The documentation offers an specific example of pull subscription using FlowControl, and it does not require the usage of open(). Then, you can maybe specify a lower limit of max_messages in order to increase throughput.dsesto
good point. I will try that. I thought the open() is in order to block the main thread. but future.result() will block anyway.Matthias
Also, I understand that the result() method blocks the execution while messages are coming in, while in reality you want the subscriber to be non-blocking so that it can process messages in the background, so I would remove that part of the code too. Is there any specific reason why you are using (1) open() and (2) result()? I think the issue can be related to how you are handling the background execution of the message processing in your callback function, which should be non-blocking. Finally, which version of the library are you using? The latest is 0.34.0.dsesto
I am not sure about what the open() method does, as I am not able to find any reference or documentation for it, but from your use case I understand you want non-blocking calls. Please correct me if I misunderstood the situation.dsesto
We are using the latest version. Just updated today. But looks like the API has changed recently. In one of the previous version there was no way to give the callback to the subscribe method so you had to use open. I will check the new API though. The future.result() is used to catch any global exception as described here.Matthias

2 Answers

1
votes

I think there are two main ways you can deal with this.

1) Instead of pushing to your worker processes directly, push to a load balancer.

or

2) Have your worker processes pull the requests rather than pushing them to the workers.

See the "Load Balancing" section under "Push and Pull delivery" in

https://cloud.google.com/pubsub/docs/subscriber

0
votes

There are lots of config options for the Python client library: https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/client.html#google.cloud.pubsub_v1.subscriber.client.Client.subscribe

In particular, you want to look at flow_control and scheduler. Important quote:

The flow_control argument can be used to control the rate of at which messages are pulled. The settings are relatively conservative by default to prevent “message hoarding” - a situation where the client pulls a large number of messages but can not process them fast enough leading it to “starve” other clients of messages. Increasing these settings may lead to faster throughput for messages that do not take a long time to process.

Also, you can control ack_deadline_seconds of a Subscription: https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.Subscription