I'm trying to make a simplified example demonstrating the use of Google Pub/Sub's message ordering feature (https://cloud.google.com/pubsub/docs/ordering). From those docs, after message ordering is enabled for a subscription,
After the message ordering property is set, the Pub/Sub service delivers messages with the same ordering key in the order that the Pub/Sub service receives the messages. For example, if a publisher sends two messages with the same ordering key, the Pub/Sub service delivers the oldest message first.
I've used this to write the following example:
package main
import (
"context"
"log"
"time"
"cloud.google.com/go/pubsub"
uuid "github.com/satori/go.uuid"
)
func main() {
client, err := pubsub.NewClient(context.Background(), "my-project")
if err != nil {
log.Fatalf("NewClient: %v", err)
}
topicID := "test-topic-" + uuid.NewV4().String()
topic, err := client.CreateTopic(context.Background(), topicID)
if err != nil {
log.Fatalf("CreateTopic: %v", err)
}
defer topic.Delete(context.Background())
subID := "test-subscription-" + uuid.NewV4().String()
sub, err := client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
})
if err != nil {
log.Fatalf("CreateSubscription: %v", err)
}
defer sub.Delete(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
messageReceived := make(chan struct{})
go sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
log.Printf("Received message with ordering key %s: %s", msg.OrderingKey, msg.Data)
msg.Ack()
messageReceived <- struct{}{}
})
topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})
topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})
for i := 0; i < 2; i++ {
select {
case <-messageReceived:
case <-time.After(10 * time.Second):
log.Fatal("Expected to receive a message, but timed out after 10 seconds.")
}
}
}
First, I tried the program without specifying OrderingKey: "foobar"
in the topic.Publish()
calls. This resulted in the following output:
> go run main.go
2020/08/10 21:40:34 Received message with ordering key : Dang2!
2020/08/10 21:40:34 Received message with ordering key : Dang1!
In other words, messages are not received in the same order as they were published in, which in my use case is undesirable and I'd like to prevent by specifying an OrderingKey
However, as soon as I added the OrderingKey
s in the publish calls, the program times out after 10 seconds of waiting to receive Pub/Sub messages:
> go run main.go
2020/08/10 21:44:36 Expected to receive a message, but timed out after 10 seconds.
exit status 1
What I would expect is to now first receive the message Dang1!
followed by Dang2!
, but instead I'm not receiving any messages. Any idea why this is not happening?
sub.Receive
function to see if there is any error. – whitespacetopic.Publish
method. – Ilya