3
votes

How to prevent duplicated msg from happening in Google Cloud PubSub?

Say, I have a code that handles the msg that it is subscribed for.

Say, I have 2 nodes with the same Service that has this code.

Once one has received the msg but not yet acknowledged it, another node will receive the same message. And this is where there's the problem that we have two duplicated msgs.

void messageReceiver(PubsubMessage pubsubMessage, AckReplyConsumer ackReply) {

        submitHandler.handle(toMessage(pubsubMessage))
                .doOnSuccess((response) -> {
                    log.info("Acknowledging the successfully processed message id: {}, response {}", pubsubMessage.getMessageId(), response);
                    ackReply.ack();  // <---- acknowledged
                })
                .doOnError((e) -> {
                    log.error("Not acknowledging due to an exception", e);
                    ackReply.nack();
                })
                .doOnTerminate(span::finish)
                .subscribe();
    }

What is the solution for this? Is it normal behaviour?

3
What is the Acknowledgement Deadline?John Hanley
did not setup it specifically should be 10 seconds by default.ses

3 Answers

6
votes

Google Cloud Pub/Sub uses "At-Least-Once" delivery. From the docs:

Typically, Cloud Pub/Sub delivers each message once and in the order in which it was published. However, messages may sometimes be delivered out of order or more than once. In general, accommodating more-than-once delivery requires your subscriber to be idempotent when processing messages.

This means it guarantees it will deliver the message 1:N times, so you can potentially get the message multiple times if you don't pipe it through something else that deduplicates it first. There isn't a setting you can define to guarantee exactly once delivery. The docs do reference you can get the behavior you desire using Cloud Dataflow's PubSubIO, but that solution appears to be deprecated:

You can achieve exactly once processing of Cloud Pub/Sub message streams using Cloud Dataflow PubsubIO. PubsubIO de-duplicates messages on custom message identifiers or those assigned by Cloud Pub/Sub.

Saying all of this, I've never actually seen Google Cloud Pub/Sub send a message twice. Are you sure that's really the problem you're having, or is the message being reissued because you are not acknowledging the message within the Acknowledgement Deadline (as you stated above, this defaults to 10 seconds). If you don't acknowledge it, it will get reissued. From the docs (emphasis mine):

A subscription is created for a single topic. It has several properties that can be set at creation time or updated later, including:

  • An acknowledgment deadline: If your code doesn't acknowledge the message before the deadline, the message is sent again. The default is 10 seconds. The maximum custom deadline you can specify is 600 seconds (10 minutes).

If that's the situation, just acknowledge your messages within the deadline and you won't see these duplicates as often.

3
votes

You can use Redis from Memorystore in order to deduplicate messages. Your publisher should add trace iD to the message body just before publishing it to PubSub. On the other side client (subscriber) should check if the trace ID is in the cache - skip the message. If there is no such message - process the message and add trace ID to cache with 7-8 days expiry time (PubSub deadline is 7 days). In such a simple way You can grant the correct messages received.

1
votes

All messages in a given topic have a unique messageID field:

ID of this message, assigned by the server when the message is published. Guaranteed to be unique within the topic. This value may be read by a subscriber that receives a PubsubMessage via a subscriptions.pull call or a push delivery. It must not be populated by the publisher in a topics.publish call.

You can use it to deduplicate incoming messages. No need to manually assigning ID.

It is a bit harder in distributed systems (e.g. multiple instances of consumers for a given subscription). You would need a global synchronization mechanism, the simplest would be to setup database (e.g. Redis) and use it to keep processed messages IDs.

You should take a look at Replaying and discarding messages which describes how to configure message retention.

There are two properties of subscription:

  • retain_acked_messages - keep acknowledge messages,
  • message_retention_duration - how long to keep messages.

If you do not plan to rewind your subscription to a past point in time, e.g. if you do not plan to reprocess messages or have bugs forcing you to reset your subscription you can set retain_acked_messages=false and message_retention_duration='3600s'. This will allow you to keep only last hour message IDs.

Bear in mind that PubSub message also have publish_time so you don't need to add it in your message's data. It can be used with message_id. Both of these are set by a PubSub server when it receives a message.