We're using PubSub in prod and seeing a problem that there are more VMs handling PubSub messages that we would expect to have.
I’ve run simple tests using PubSub overnight and it appears that something goes not so smooth as we've expected with the rate limiting mechanism.
Here is the test:
- Publish some amount of messages into a topic with Pull Subscription. In the experiment, there are about 2,7k messages (started approx at 9pm)
- Configure one async client using the StreamingPull connection and FlowControl set to 2.
- Simulate that handling of every incoming message takes 5 seconds via moving the execution into a timer and acknowledging the message only when the timer finishes.
Expected results: Messages from PubSub are consumed with the same speed, getting 2 messages at a time every 5 seconds. A small timeout between acking a message and a new message pulled due to all the network and processing expenses is expected.
Actual result: PubSub starts throttling, or something like this, with a huge timeout. No message arrives at that time. The timeout depends on amount of unacked messages in subscription.
It doesn't seem clear from the FlowControl docs.
Here is the code of consumer (client):
var concurrentFlowsNumber = config.getLong(CONFIG_NUMBER_OF_THREADS);
var flowSettings = FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(concurrentFlowsNumber)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build();
var subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setCredentialsProvider(() -> serviceAccountCredentials)
.setFlowControlSettings(flowSettings)
.build();
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(ApiService.State from, Throwable failure) {
logger.error(failure);
}
},
MoreExecutors.directExecutor());
var apiService = subscriber.startAsync();
apiService.addListener(new ApiService.Listener() {
@Override
public void running() {
logger.info("Pubsub started");
}
@Override
public void failed(ApiService.State from, Throwable failure) {
logger.error("Pubsub failed on step: {}", from);
}
}, Runnable::run);
And the message handler is:
private static void handlePubSubMessage(PubsubMessage message, AckReplyConsumer consumer) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
consumer.ack();
}
}, (long) 3000 + rand.nextInt(5000));
}
So, does anyone have any idea how to make the clients (many vms) consume messages with concurrent handling limitations (up to 4 concurrent messages) without breaking for timeouts?
P.s. These questions are similar, but not the same: Google pubsub flow control pubsub Dynamic rate limiting Cloud pubsub slow poll rate