0
votes

I'm trying to make a simplified example demonstrating the use of Google Pub/Sub's message ordering feature (https://cloud.google.com/pubsub/docs/ordering). From those docs, after message ordering is enabled for a subscription,

After the message ordering property is set, the Pub/Sub service delivers messages with the same ordering key in the order that the Pub/Sub service receives the messages. For example, if a publisher sends two messages with the same ordering key, the Pub/Sub service delivers the oldest message first.

I've used this to write the following example:

package main

import (
    "context"
    "log"
    "time"

    "cloud.google.com/go/pubsub"
    uuid "github.com/satori/go.uuid"
)

func main() {
    client, err := pubsub.NewClient(context.Background(), "my-project")
    if err != nil {
        log.Fatalf("NewClient: %v", err)
    }

    topicID := "test-topic-" + uuid.NewV4().String()
    topic, err := client.CreateTopic(context.Background(), topicID)
    if err != nil {
        log.Fatalf("CreateTopic: %v", err)
    }
    defer topic.Delete(context.Background())

    subID := "test-subscription-" + uuid.NewV4().String()
    sub, err := client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{
        Topic:                 topic,
        EnableMessageOrdering: true,
    })
    if err != nil {
        log.Fatalf("CreateSubscription: %v", err)
    }
    defer sub.Delete(context.Background())

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    messageReceived := make(chan struct{})
    go sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        log.Printf("Received message with ordering key %s: %s", msg.OrderingKey, msg.Data)
        msg.Ack()
        messageReceived <- struct{}{}
    })

    topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})
    topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})

    for i := 0; i < 2; i++ {
        select {
        case <-messageReceived:
        case <-time.After(10 * time.Second):
            log.Fatal("Expected to receive a message, but timed out after 10 seconds.")
        }
    }
}

First, I tried the program without specifying OrderingKey: "foobar" in the topic.Publish() calls. This resulted in the following output:

> go run main.go
2020/08/10 21:40:34 Received message with ordering key : Dang2!
2020/08/10 21:40:34 Received message with ordering key : Dang1!

In other words, messages are not received in the same order as they were published in, which in my use case is undesirable and I'd like to prevent by specifying an OrderingKey

However, as soon as I added the OrderingKeys in the publish calls, the program times out after 10 seconds of waiting to receive Pub/Sub messages:

> go run main.go
2020/08/10 21:44:36 Expected to receive a message, but timed out after 10 seconds.
exit status 1

What I would expect is to now first receive the message Dang1! followed by Dang2!, but instead I'm not receiving any messages. Any idea why this is not happening?

2
have you tried, checking the sub.Receive function to see if there is any error.whitespace
You should also check the result of topic.Publish method.Ilya

2 Answers

2
votes

The publishes are failing with the following error: Failed to publish: Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering.

You can see this if you change your publish calls to check the error:

res1 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})
res2 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})

_, err = res1.Get(ctx)
if err != nil {
    fmt.Printf("Failed to publish: %v", err)
    return
}

_, err = res2.Get(ctx)
if err != nil {
    fmt.Printf("Failed to publish: %v", err)
    return
}

To fix it, add a line to enable message ordering on your topic. Your topic creation would be as follows:

topic, err := client.CreateTopic(context.Background(), topicID)
if err != nil {
    log.Fatalf("CreateTopic: %v", err)
}
topic.EnableMessageOrdering = true
defer topic.Delete(context.Background())
0
votes

I independently came up with the same solution as Kamal, just wanted to share the full revised implementation:

package main

import (
    "context"
    "flag"
    "log"
    "time"

    "cloud.google.com/go/pubsub"
    uuid "github.com/satori/go.uuid"
)

var enableMessageOrdering bool

func main() {
    flag.BoolVar(&enableMessageOrdering, "enableMessageOrdering", false, "Enable and use Pub/Sub message ordering")
    flag.Parse()

    client, err := pubsub.NewClient(context.Background(), "fleetsmith-dev")
    if err != nil {
        log.Fatalf("NewClient: %v", err)
    }

    topicID := "test-topic-" + uuid.NewV4().String()
    topic, err := client.CreateTopic(context.Background(), topicID)
    if err != nil {
        log.Fatalf("CreateTopic: %v", err)
    }
    topic.EnableMessageOrdering = enableMessageOrdering
    defer topic.Delete(context.Background())

    subID := "test-subscription-" + uuid.NewV4().String()
    sub, err := client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{
        Topic:                 topic,
        EnableMessageOrdering: enableMessageOrdering,
    })
    if err != nil {
        log.Fatalf("CreateSubscription: %v", err)
    }
    defer sub.Delete(context.Background())

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    messageReceived := make(chan struct{})
    go sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        log.Printf("Received message with ordering key %s: %s", msg.OrderingKey, msg.Data)
        msg.Ack()
        messageReceived <- struct{}{}
    })

    msg1, msg2 := &pubsub.Message{Data: []byte("Dang1!")}, &pubsub.Message{Data: []byte("Dang2!")}
    if enableMessageOrdering {
        msg1.OrderingKey, msg2.OrderingKey = "foobar", "foobar"
    }
    publishMessage(topic, msg1)
    publishMessage(topic, msg2)

    for i := 0; i < 2; i++ {
        select {
        case <-messageReceived:
        case <-time.After(10 * time.Second):
            log.Fatal("Expected to receive a message, but timed out after 10 seconds.")
        }
    }
}

func publishMessage(topic *pubsub.Topic, msg *pubsub.Message) {
    publishResult := topic.Publish(context.Background(), msg)
    messageID, err := publishResult.Get(context.Background())
    if err != nil {
        log.Fatalf("Get: %v", err)
    }
    log.Printf("Published message with ID %s", messageID)
}

When called with the enableMessageOrdering flag set to true, I receive Dang1! first, followed by Dang2!:

> go run main.go --enableMessageOrdering
2020/08/11 05:38:07 Published message with ID 1420685949616723
2020/08/11 05:38:08 Published message with ID 1420726763302425
2020/08/11 05:38:09 Received message with ordering key foobar: Dang1!
2020/08/11 05:38:11 Received message with ordering key foobar: Dang2!

whereas without it, I receive them in reverse order as before:

> go run main.go
2020/08/11 05:38:47 Published message with ID 1420687395091051
2020/08/11 05:38:47 Published message with ID 1420693737065665
2020/08/11 05:38:48 Received message with ordering key : Dang2!
2020/08/11 05:38:48 Received message with ordering key : Dang1!