0
votes

I would like to have information about the setting of the publisher in the pubsub environment of gcp. I would like to enqueue messages that will be consumed via a google function. To achieve this, the publication will trigger when a number of messages is reached or from a certain time.

I set the topic as follows :

topic.PublishSettings = pubsub.PublishSettings{
        ByteThreshold:  1e6, // Publish a batch when its size in bytes reaches this value. (1e6 = 1Mo)
        CountThreshold: 100, // Publish a batch when it has this many messages.
        DelayThreshold: 10 * time.Second, // Publish a non-empty batch after this delay has passed.
    }

When I call the publish function, I have a 10 second delay on each call. Messages are not added to the queue ...

for _, v := range list {
    ctx := context.Background()
    res := a.Topic.Publish(ctx, &pubsub.Message{Data: v})

    // Block until the result is returned and a server-generated
    // ID is returned for the published message.
    serverID, err = res.Get(ctx)
    if err != nil {
        return "", err
    }
}

Someone can help me ?

Cheers

1
When you say "I have a 10 second delay on each call," do you mean that res.Get is returning after 10 seconds? When you say "Messages are not added to the queue," what do you mean? res.Get is not returning? It is returning an error? Your subscriber is not receiving the message? Additionally, what does "stack messages" mean? You want the messages all in a single batch that is processed in a Cloud Function as a unit?Kamal Aboul-Hosn
No, there is a delay of 10 sec between each publication. res.Get is returning normaly. I want to batch all requests as I can see here (stackoverflow.com/questions/49070836/batching-pubsub-requests). Then I will trigger cloud function (subscriber) to minimize the costs of process messages. I understood that the 3 messages for example; were put in the queue, and after 10 seconds they had to be publishedanthony44

1 Answers

1
votes

Batching the publisher side is designed to allow for more cost efficiency when sending messages to Google Cloud Pub/Sub. Given that the minimum billing unit for the service is 1KB, it can be cheaper to send multiple messages in the same Publish request. For example, sending two 0.5KB messages as separate Publish requests would result in being changed for sending 2KB of data (1KB for each). If one were to batch that into a single Publish request, it would be charged as 1KB of data.

The tradeoff with batching is latency: in order to fill up batches, the publisher has to wait to receive more messages to batch together. The three batching properties (ByteThreshold, CountThreshold, and DelayThreshold) allow one to control the level of that tradeoff. The first two properties control how much data or how many messages we put in a single batch. The last property controls how long the publisher should wait to send a batch.

As an example, imagine you have CountThreshold set to 100. If you are publishing few messages, it could take awhile to receive 100 messages to send as a batch. This means that the latency for messages in that batch will be higher because they are sitting in the client waiting to be sent. With DelayThreshold set to 10 seconds, that means that a batch would be sent if it had 100 messages in it or if the first message in the batch was received at least 10 seconds ago. Therefore, this is putting a limit on the amount of latency to introduce in order to have more data in an individual batch.

The code as you have it is going to result in batches with only a single message that each take 10 seconds to publish. The reason is the call to res.Get(ctx), which will block until the message has been successfully sent to the server. With CountThreshold set to 100 and DelayThreshold set to 10 seconds, the sequence that is happening inside your loop is:

  1. A call to Publish puts a message in a batch to publish.
  2. That batch is waiting to receive 99 more messages or for 10 seconds to pass before sending the batch to the server.
  3. The code is waiting for this message to be sent to the server and return with a serverID.
  4. Given the code doesn't call Publish again until res.Get(ctx) returns, it waits 10 seconds to send the batch.
  5. res.Get(ctx) returns with a serverID for the single message.
  6. Go back to 1.

If you actually want to batch messages together, you can't call res.Get(ctx) before the next Publish call. You'll want to either call publish inside a goroutine (so one routine per message) or you'll want to amass the res objects in a list and then call Get on them outside the loop, e.g.:

    var res []*PublishResult
    ctx := context.Background()
    for _, v := range list {
        res = append(res, a.Topic.Publish(ctx, &pubsub.Message{Data: v}))
    }
    for _, r := range res  {
        serverID, err = r.Get(ctx)
        if err != nil {
            return "", err
        }
    }

Something to keep in mind is that batching will optimize cost on the publish side, not on the subscribe side. Cloud Functions is built with push subscriptions. This means that messages must be delivered to the subscriber one at a time (since the response code is what is used to ack or nack each message), which means there is no batching of messages delivered to the subscriber.