2
votes

I have a cloud function which publishes a message to PubSub and that triggers a cloud run to perform an archive file process. When there are large files, my cloud run python code takes some time to process the data it looks like PubSub is retrying the message after 20 seconds (default acknowledge deadline time) which is triggering another instance from my Cloud Run. I've increased the acknowledge deadline to 600s and redeployed everything but it's still retrying the message after 20 seconds. I am missing anything?

Cloud Function publishing the message code:

# Publishes a message
   try:
      publish_future = publisher.publish(topic_path, data=message_bytes)
      publish_future.result()  # Verify the publish succeeded
      return 'Message published.'
   except Exception as e:
      print(e)
      return (e, 500)

Here is the PubSub subscription config: enter image description here

Logging showing a second instance being triggered after 20s: enter image description here

Cloud Run code:

@app.route("/", methods=["POST"])
def index():
    envelope = request.get_json()
    if not envelope:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400        

    if not isinstance(envelope, dict) or "message" not in envelope:
        msg = "invalid Pub/Sub message format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    pubsub_message = envelope["message"]

    if isinstance(pubsub_message, dict) and "data" in pubsub_message:
        #Decode base64 event['data']
        event_data = base64.b64decode(pubsub_message['data']).decode('utf-8')
        message = json.loads(event_data)

        #logic to process data/archive
        return ("", 204)



1
Can you share how do you handle the PubSub message in Cloud Run?guillaume blaquiere
Hi @guillaume, I've edited my main post. It's a very basic Flask app which will take the request and return 204 once done (same example as Google docs). Thank you!CaioT
Are you sure that comme from your PubSub push subscription? Do you have several subscription that send message to your Cloud Run service? Can you also try to purge the subscription and try again with new messages? And are you sure that is the same message?guillaume blaquiere
Yes, it's coming from pubsub but pull subscription. I printed the request from variable request.get_json() and I saw the message coming in twice (same messageId). one thing though, I just changed the retry policy to have 60s min backoff and I no longer see the duplicated message but it might happen if it takes more than 60s.CaioT
You talk about pull but your screenshot is a push subscription. You lost me!!guillaume blaquiere

1 Answers

1
votes

You should be able to control the retries by setting the minimumBackoff retrypolicy. You can set the minimumBackoff time to the max of 600 seconds, like your ack deadline, so that redelivered messages will be more than 600 seconds old. This should lower the number of occurrences you see.

To handle duplicates, making your subscriber idempotent is recommended. You need to apply some kind of code check to see if the messageId was processed before.

You can find below in the documentation at-least-once-delivery :

Typically, Pub/Sub delivers each message once and in the order in which it was published. However, messages may sometimes be delivered out of order or more than once. In general, accommodating more-than-once delivery requires your subscriber to be idempotent when processing messages. You can achieve exactly once processing of Pub/Sub message streams using the Apache Beam programming model. The Apache Beam I/O connectors let you interact with Cloud Dataflow via controlled sources and sinks. You can use the Apache Beam PubSubIO connector (for Java and Python) to read from Cloud Pub/Sub. You can also achieve ordered processing with Cloud Dataflow by using the standard sorting APIs of the service. Alternatively, to achieve ordering, the publisher of the topic to which you subscribe can include a sequence token in the message.