0
votes

We have the system of publisher and subscriber systems based on GCP PubSub. Subscriber processing single message quite long, about 1 minute. We already set subscribers ack deadline to 600 seconds (10 minutes) (maximal one) to make sure, that pubsub will not start redelivery too earlier, as basically we have long running operation here.

I'm seeing this behavior of PubSub. While code sending ack, and monitor confirms that PubSub acknowledgement request has been accepted and acknowledgement itself completed with success status, total number of unacked messages still the same.

enter image description here

Metrics on the charts showing the same for sum, count and mean aggregation aligner. On the picture above aligner is mean and no reducers enabled.

I'm using @google-cloud/pubsub Node.js library. Different versions has been tried (0.18.1, 0.22.2, 0.24.1), but I guess issue not in them.

The following class can be used to check.

TypeScript 3.1.1, Node 8.x.x - 10.x.x

import { exponential, Backoff } from "backoff";

const pubsub = require("@google-cloud/pubsub");

export interface IMessageHandler {
    handle (message): Promise<void>;
}

export class PubSubSyncListener {
    private readonly client;

    private listener: Backoff;

    private runningOperations: Promise<unknown>[] = [];

    constructor (
        private readonly handler: IMessageHandler,
        private readonly options: {
            /**
             * Maximal messages number to be processed simultaniosly.
             * Listener will try to keep processing number as close to provided value
             * as possible.
             */
            maxMessages: number;
            /**
             * Formatted full subscrption name /projects/{projectName}/subscriptions/{subscriptionName}
             */
            subscriptionName: string;
            /**
             * In milliseconds
             */
            minimalListenTimeout?: number;
            /**
             * In milliseconds
             */
            maximalListenTimeout?: number;
        }
    ) {
        this.client = new pubsub.v1.SubscriberClient();

        this.options = Object.assign({
            minimalListenTimeout: 300,
            maximalListenTimeout: 30000
        }, this.options);
    }

    public async listen () {
        this.listener = exponential({
            maxDelay: this.options.maximalListenTimeout,
            initialDelay: this.options.minimalListenTimeout
        });

        this.listener.on("ready", async () => {
            if (this.runningOperations.length < this.options.maxMessages) {
                const [response] = await this.client.pull({
                    subscription: this.options.subscriptionName,
                    maxMessages: this.options.maxMessages - this.runningOperations.length
                });

                for (const m of response.receivedMessages) {
                    this.startMessageProcessing(m);
                }
                this.listener.reset();
                this.listener.backoff();
            } else {
                this.listener.backoff();
            }
        });

        this.listener.backoff();
    }

    private startMessageProcessing (message) {
        const index = this.runningOperations.length;

        const removeFromRunning = () => {
            this.runningOperations.splice(index, 1);
        };

        this.runningOperations.push(
            this.handler.handle(this.getHandlerMessage(message))
                .then(removeFromRunning, removeFromRunning)
        );
    }

    private getHandlerMessage (message) {
        message.message.ack = async () => {
            const ackRequest = {
                subscription: this.options.subscriptionName,
                ackIds: [message.ackId]
            };

            await this.client.acknowledge(ackRequest);
        };

        return message.message;
    }

    public async stop () {
        this.listener.reset();
        this.listener = null;
        await Promise.all(
            this.runningOperations
        );
    }
}

This is basically partial implementation of async pulling of the messages and immediate acknowledgment. Because one of the proposed solutions was in usage of the synchronous pulling.

I found similar reported issue in java repository, if I'm not mistaken in symptoms of the issue.

https://github.com/googleapis/google-cloud-java/issues/3567

The last detail here is that acknowledgment seems to work on the low number of requests. In case if I fire single message in pubsub and then immediately process it, undelivered messages number decreases (drops to 0 as only one message was there before).

The question itself - what is happening and why unacked messages number is not reducing as it should when ack has been received?

1

1 Answers

2
votes

To quote from the documentation, the subscription/num_undelivered_messages metric that you're using is the "Number of unacknowledged messages (a.k.a. backlog messages) in a subscription. Sampled every 60 seconds. After sampling, data is not visible for up to 120 seconds."

You should not expect this metric to decrease immediately upon acking a message. In addition, it sounds as if you are trying to use pubsub for an exactly once delivery case, attempting to ensure the message will not be delivered again. Cloud Pub/Sub does not provide these semantics. It provides at least once semantics. In other words, even if you have received a value, acked it, received the ack response, and seen the metric drop from 1 to 0, it is still possible and correct for the same worker or another to receive an exact duplicate of that message. Although in practice this is unlikely, you should focus on building a system that is duplicate tolerant instead of trying to ensure your ack succeeded so your message won't be redelivered.