4
votes

We began to notice the number of "unacknowledged messages" for a certain topic/subscription increases from time to time, according to the stackdriver charts.

Symptoms

I don't know how much we can trust stackdriver charts, but I've already checked:

  • pull operation count is as many as publish operation count
  • ack operation count is lower than pull operation count when the problem occurs

Also, I was able to see that pubsub actually sends the same message multiple times, according to our log, which also confirms that 'pull' is successful but 'ack' is probably unsuccessful.

So, I think we can assume that our system pulls promptly but does not ACK well from GCP's perspective.

I checked the possibilities of not sending ACK on time, but I don't think is the case, as I show the flow below.

In the problematic subscription, messages are being accumulated for several hours. For us, this is a severe problem.

Implementation Details

We use the pull method for some reason, and we're reluctant to switch to the push method, unless there is a good rationale. For each subscription, we have one message-pumping goroutine and this goroutine spawns a worker for each pulled message. To be more specific,

// in a dedicated message-pumping goroutine
sub, _ := CreateSubscription(..., 0 /* ack-deadline */, )
iter, _ := sub.Pull(...)
for {
   // omitted: wait if we have too many workers
   msg, _ := iter.Next()
   go func(msg Message) {
     // omitted: handle the message and measure the latency; it turned out the latency is almost within 1 second
     msg.Done(true)
   }(msg)
}

For load-balancing, the subscription is also pulled by other pods in the same cluster. So, for one subscription (as in Google Pubsub topic/subscription), we have multiple subscription objects (as in Go binding's Subscription structure), each of which is used solely in one pod. And, each subscription object creates one iterator. I believe this setting is not wrong, but please correct me if I'm wrong.

As this code shows, we do ACK. (Our server does not panic; so there is no path for getting around msg.Done().)

Attempts

The weird thing is the problematic subscription is not a busy one. We usually don't have any problem with another subscription that receives far more messages in the same pod. So, I began to wonder if the max-prefetch option for the pull operation affects. It seems it fixed a problem for a while but the problem reoccurred.

I've also increased the number of pods, which effectively increases the number of workers, as advised by Google Support. This did not help much. Since we do not publish many messages to the problematic (about 1 message/sec) and we have plenty of (probably too many) workers, I don't believe our server is overloaded.

Could someone shed some light on this?

1
I'm facing this same problem using the Node.js library, started happening last week. The ack doesn't work and then I have to wait the message to be redelivered - andresk

1 Answers

4
votes

In my case, symptoms that Ack does not return for some reason occur regularly, time out against invoke of gRPC is not set and the groutine of 'acker' was blocking.

screen shot

So I solved it by passing gRPC options from pubsub.NewClient.

import (
  "cloud.google.com/go/pubsub"
  "google.golang.org/api/option"
  "google.golang.org/grpc"
)

// ...

scChan := make(chan grpc.ServiceConfig)
go func() {
    sc := grpc.ServiceConfig{
        Methods: map[string]grpc.MethodConfig{
            "/google.pubsub.v1.Subscriber/Acknowledge": {
                Timeout: 5 * time.Second,
            },
        },
    }
    scChan <- sc
}()

c, err := pubsub.NewClient(ctx, project, option.WithGRPCDialOption(grpc.WithServiceConfig(scChan)))

And you can investigate the cause by specifying grpc.EnableTracing = true.

grpc.EnableTracing = true

c, err := pubsub.NewClient(ctx, project)
if err != nil {
    return nil, errors.Wrap(err, "pubsub.NewClient")
}

go func(){
    http.ListenAndServe(":8080", nil)
}()

The trace information of gRPC can be confirmed through golang.org/x/net/trace.