0
votes

Using a google cloud platform to implement pubsub model and using functions to create a topic, subscriber, publish and pullmsg function.

func pullMsgs(projectID, subID string, jsonPath string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, projectID, option.WithCredentialsFile(jsonPath))
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }

    // Consume 10 messages.
    var mu sync.Mutex
    received := 0
    sub := client.Subscription(subID)
    cctx, cancel := context.WithCancel(ctx)
    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
        mu.Lock()
        defer mu.Unlock()
        // fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
        fmt.Println("Got message: n", string(msg.Data))
        msg.Ack()
        received++
        if received == 10 {
            cancel()
        }
    })
    if err != nil {
        return fmt.Errorf("Receive: %v", err)
    }
    return nil
}

pullmsg function uses subscription Id to get published message from the publisher. Assume that model has 3 subscriber for the particular topic. if the publisher publishes msg for that topic. pullmsg function has to be executed 3 times to get that message for all subscribers. Is there any method to get published message to all subscribers at a single shot.

1
By design, each subscription (and hence each subscriber) will receive a different copy of the same message. Can you clarify if the 3 subscribers are in the same process? If so, is there a reason that 1 subscriber is not sufficient?TimeString

1 Answers

2
votes

I do not see why you exactly want to pull from 3 subscriptions at once. Since those messages may not arrive at/around the same time. So if you want to combine those messages, your code has to wait until it received all the messages. That is not really the purpose of Pub/Sub. On the other hand, if you do not want to combine the three messages, I would recommend you create a separate piece of code for every subscription.

That being said, you could pull all the messages using a synchronous pull mechanism or an asynchronous pull mechanism. If you do not want your code to be blocking so that it basically listens to all subscriptions at the same time, you could use an asynchronous pull.

A code flow could be:

  • Create a non-blocking subscriber function.
  • Let this pull function stop when x messages have arrived (break a while loop).
  • Create three instances of the function passing your three topic ID's.
  • Create another function for you processing.

More information can be found here.