3
votes

I am working on a sample project that takes output from bigquery and publishes it to pubsub. The row output from bigquery could be >100,000. I saw there are options to batch publish and I've read in multiple places that 1k messages per batch is ideal. The issue I am running into is that for the life of me I can't get it to batch multiple messages and I think the solution is simple, but I'm missing how to do it..

Here is what I have right now and all it does is publish one message at a time.

func publish(client pubsub.Client, data []byte) (string, error) {
    ctx := context.Background()

    topic := client.Topic("topic-name")
    topic.PublishSettings = pubsub.PublishSettings{
        // ByteThreshold:  5000,
        CountThreshold: 1000, // no matter what I put here it still sends one per publish
        // DelayThreshold: 1000 * time.Millisecond,
    }

    result := topic.Publish(ctx, &pubsub.Message{
        Data: data,
    })

    id, err := result.Get(ctx)
    if err != nil {
        return "", err
    }

    return id, nil
}

And this function is called by:

for _, v := range qr {
        data, err := json.Marshal(v)
        if err != nil {
            log.Printf("Unable to marshal %s", data)
            continue
        }
        id, err := publish(*pubsubClient, data)
        if err != nil {
            log.Printf("Unable to publish message: %s", data)
        }
        log.Printf("Published message with id: %s", id)
    }

Where qr is a slice of structs that contain the data returned from the bigquery query.

Now, is it due to how I am calling the function publish that makes each message get published and the topic.PublishSettings are being overwritten each method call so it forgets the previous messages? I'm at a loss here.

I saw some of the batch publishing code here: https://github.com/GoogleCloudPlatform/golang-samples/blob/master/pubsub/topics/main.go#L217

But they don't actually call it in their sample, so I can't tell how it should be done.

Side note and to prove my point further that it doesn't work, if I set the DelayThreshold in the topic.PublishSettings var to say, 1 second, it simply publishes one message every second, not all the messages that are supposed to be in memory.

Appreciate the help, thanks.

EDIT #1:

So going with kingkupps comment, I switched up the code to be this for testing purposes: (project and topic names switched from the real ones)

func QueryAndPublish(w http.ResponseWriter, r *http.Request) {
    ctx := context.Background()
    // setting up the pubsub client
    pubsubClient, err := pubsub.NewClient(ctx, "fake-project-id")
    if err != nil {
        log.Fatalf("Unable to get pubsub client: %v", err)
    }

    // init topic and settings for publishing 1000 messages in batch
    topic := pubsubClient.Topic("fake-topic")
    topic.PublishSettings = pubsub.PublishSettings{
        // ByteThreshold:  5000,
        CountThreshold: 1000,
        // DelayThreshold: 1000 * time.Millisecond,
    }

    // bq set up
    bqClient, err := bigquery.NewClient(ctx, "fake-project-id")
    if err != nil {
        log.Fatalf("Unable to get bq client: %v", err)
    }
    // bq query function call
    qr, err := query(*bqClient)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Got query results, publishing now")

    // marshalling messages to json format
    messages := make([][]byte, len(qr))
    timeToMarshal := time.Now()
    for i, v := range qr {
        data, err := json.Marshal(v)
        if err != nil {
            log.Printf("Unable to marshal %s", data)
            continue
        }
        messages[i] = data
    }
    elapsedMarshal := time.Since(timeToMarshal).Nanoseconds() / 1000000
    log.Printf("Took %v ms to marshal %v messages", elapsedMarshal, len(messages))

    // publishing messages
    timeToPublish := time.Now()
    publishCount := 0
    for _, v := range messages {
        // ignore result, err from topic.Publish return, just publish
        topic.Publish(ctx, &pubsub.Message{
            Data: v,
        })
        publishCount++
    }
    elapsedPublish := time.Since(timeToPublish).Nanoseconds() / 1000000
    log.Printf("Took %v ms to publish %v messages", elapsedPublish, publishCount)

    fmt.Fprint(w, "Job completed")
}

What this does now is when my message count is 100,000 it will finish the publish calls in roughly 600ms but in the background, it will still be publishing one by one to the pubsub endpoint.

I can see this in both StackDriver and Wireshark where my messages/second in stackdriver is roughly 10-16/second and Wireshark is showing new connections per message sent.

1
Have you tried using the same pubsub.Topic to publish all your messages? I wonder if each Topic has its own message queue. The examples are sort of unclear about this but godoc seems to suggest each Topic has it's own pool of resources. godoc.org/cloud.google.com/go/pubsub#hdr-Publishing - kingkupps
@kingkupps I moved the publish code into the calling function, more or less. The topic is defined right after the client is created and now within that for loop I am using the same topic, but the result is the same. I also removed the result.Get(ctx) call but that just caused the function to execute quickly because that call blocks, but the topic.Publish does not. All that leads to are the messages being published in the background. I also checked my network traffic with Wireshark and that seems to show that there is a connection request per message being sent. - Tony
Edited post to show new code snippet. - Tony

1 Answers

2
votes

This is likely because when you call

topic.PublishSettings = pubsub.PublishSettings{ // ByteThreshold: 5000, CountThreshold: 1000, // DelayThreshold: 1000 * time.Millisecond, }

you are resetting the publish settings to a zero initialized struct. This sets topic.PublishSettings.ByteThreshold to 0, which means all messages will be immediately published; you told it to wait until it has 0 bytes, and it always has 0 bytes.

Instead, you should do the following to set CountThreshold:

topic.PublishSettings.CountThreshold = 1000

The same applies to the other fields. They are already initialized to default values as described here, if you want to change them, modify them directly instead of reassigning the entire PublishSetttings object.