In my scenario I'm scheduling tasks using PubSub. This is up to 2.000 PubSub messages that are than consumed by a Python script that runs inside a Docker Container within Google Compute Engine. That script consumes the PubSub messages.
The processing of each message is about 30seconds to 5min. Therefore the acknowledgement deadline is 600sec (10min).
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.message import Message
def handle_message(message: Message):
# do your stuff here (max. 600sec)
message.ack()
return
def receive_messages(project, subscription_name):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)
flow_control = pubsub_v1.types.FlowControl(max_messages=5)
subscription = subscriber.subscribe(subscription_path, flow_control=flow_control)
future = subscription.open(handle_message)
# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
try:
future.result()
except Exception as e:
# do some logging
raise
Because I'm working on so many PubSub messages, I'm creating a template for a Compute Engine that uses auto-scaling in either of these two ways:
gcloud compute instance-groups managed create my-worker-group \
--zone=europe-west3-a \
--template=my-worker-template \
--size=0
gcloud beta compute instance-groups managed set-autoscaling my-worker-group \
--zone=europe-west3-a \
--max-num-replicas=50 \
--min-num-replicas=0 \
--target-cpu-utilization=0.4
gcloud beta compute instance-groups managed set-autoscaling my-worker-group \
--zone=europe-west3-a \
--max-num-replicas=50 \
--min-num-replicas=0 \
--update-stackdriver-metric=pubsub.googleapis.com/subscription/num_undelivered_messages \
--stackdriver-metric-filter="resource.type = pubsub_subscription AND resource.label.subscription_id = my-pubsub-subscription" \
--stackdriver-metric-single-instance-assignment=10
So far, so good. Option one scales up to about 8 instances while the second option will start the maximum number of instances. Now I figured out that some strange things happen and this is why I'm posting here. Maybe you can help me out?!
Message duplicates: It seems that the PubSub service in each instance (Python script inside the docker container within compute engine) reads a batch of messages (~10) in somewhat like a buffer and gives them to my code. Looks like all instances that spin up at the same time will read all the same messages (the first 10 of 2.000) and will start working on the same stuff. In my logs I see that most messages are processed 3 times by different machines. I was expecting that PubSub knows if some subscriber buffered 10 messages so that another subscriber will buffer 10 different messages and not the same ones.
Acknowledgement deadline: Because of the buffering the messages that come in the end of the buffer (let's say message 8 or 9) had to wait in the buffer until the preceding messages (messages 1 to 7) have been processed. The sum of that waiting time plus its own processing time may run into the timeout of 600sec.
Load-Balancing: Because each machine buffers so many messages, the load is consumed by just a few instances while other instances are completely idle. This happens for the scaling-option two that uses the PubSub stackdriver metric.
People told me that I need to implement a manual synchronization service using Cloud SQL or something else in which each instance indicates on which message it is working, so that other instances won't start the same. But I feel that can't be true - because then I don't get the idea what PubSub is all about.
Update: I found a nice explanation by Gregor Hohpe, co-author of the book Enterprise Integration Patterns from 2015. Actually my observation was wrong, but the observed side effects are real.
Google Cloud Pub/Sub API actually implements both the Publish-Subscribe Channel and the Competing Consumers patterns. At the heart of the Cloud Pub/Sub is a classic Publish-Subscribe Channel, which delivers a single message published to it to multiple subscribers. One advantage of this pattern is that adding subscribers is side-effect free, which is one reason a Publish-Subscribe Channel is sometimes considered more loosely coupled than a Point-to-Point Channel, which delivers a message to exactly one subscriber. Adding consumers to a Point-to-Point Channel results in Competing Consumers and thus has a strong side effect.
The side effects I observed are about the message buffering and message flow control in each of the subscribers (who subscribed to the same subscription, point-to-point == competing consumers). The current version of the Python Client Lib wraps the PubSub REST API (and RPCs). If that wrapper is used, there is no control on:
- How many containers are started on one VM; Multiple containers may be started if the CPU is not yet fully utilized
- How many messages are pulled from the subscription at once (buffering); no control at all
- How many threads, for processing the pulled messages, are started inside on container; flow_control(max_messages) has no effect if the value is below a fixed value.
The side effects we observed are:
- One consumer pulls a high number of messages at once (approximately 100 to 1.000) and queues them in its client buffer. Therefore all other VM's that are started according to the auto-scaling rule, do not receive any message, because all messages are in the queue of the first few VM's
- Messages are re-delivered either to the same VM or any other VM (or docker container) if it runs into the acknowledgement deadline. Therefore you need to modify the acknowledgement deadline while processing the message. The deadline counter starts when the processing starts.
- Assuming that the processing of the message is a long running task (for instance machine learning), you may
- Acknowledge the message upfront, but this will cause the VM to be shut down by the auto-scaling rule if there is no further message waiting. The rule does not care if the CPU utilization is still strong and the processing has not yet been finished.
- Acknowledge the message after processing. In this case you need to modify the acknowledgement deadline of that specific message while processing that message. There must not be one single code block that breaks the deadline since the last modification.
Possible solutions that haven't been looked into:
- Using the Java Client Library since it comes with better controls on pulling and consuming messages
- Using the underlying API calls and classes of the Python Client Library
- Building a synchronization storage that orchestrates the competing consumers
open()
method instead of usingsubscriber.subscribe()
as the only future, passing to it the callback function which you want to call with each message. The documentation offers an specific example of pull subscription usingFlowControl
, and it does not require the usage ofopen()
. Then, you can maybe specify a lower limit ofmax_messages
in order to increase throughput. – dsestoopen()
is in order to block the main thread. butfuture.result()
will block anyway. – Matthiasresult()
method blocks the execution while messages are coming in, while in reality you want the subscriber to be non-blocking so that it can process messages in the background, so I would remove that part of the code too. Is there any specific reason why you are using (1)open()
and (2)result()
? I think the issue can be related to how you are handling the background execution of the message processing in your callback function, which should be non-blocking. Finally, which version of the library are you using? The latest is0.34.0
. – dsestoopen()
method does, as I am not able to find any reference or documentation for it, but from your use case I understand you want non-blocking calls. Please correct me if I misunderstood the situation. – dsestosubscribe
method so you had to useopen
. I will check the new API though. Thefuture.result()
is used to catch any global exception as described here. – Matthias