13
votes

I'm using Google's Pub/Sub queue to handle messages between services. Some of the subscribers connect to rate-limit APIs.

For example, I'm pushing street addresses onto a pub/sub topic. I have a Cloud function which subscribes (via push) to that topic, and calls out to an external rate-limited geocoding service. Ideally, my street addresses could be pushed onto the topic with no delay, and the topic would retain those messages - calling the subscriber in a rate-limited fashion.

Is there anyway to configure such a delay, or a message distribution rate limit? Increasing the Ack window doesn't really help: I've architected this system to prevent long-running functions.

1
I've run into rate limiting issues with PubSub and Cloud Functions. The solution has been create a record of jobs to be done in a Datastore. Create a Cloud Function on a scheduler that pulls n objects from Datastore, then execute a cloud function. You're effectively creating a queuing system. It's a pain to update the DS objects with each step in the process, but it also makes it easy to visualize the pipeline.ethanenglish
One year, any update?slideshowp2
@slideshowp2 google does not seem interested in supporting this use case, no udpatechris stamper
You could try to pull from Pub/Sub periodically, using Cloud Scheduler you can set a cron to trigger a Cloud Function to pull a limited number of messages from Pub/Sub topic and call your rate-limited API.Jonathan Lin
three years later, the above seems to be the right approach (manually pulling from the queue on regular intervals via a cron job)chris stamper

1 Answers

3
votes

An aproach to solve your problem is by using: async.queue

There you have a concurrency attribute wich you can manage the rate limit.

// create a queue object with concurrency 2
var q = async.queue(function(task, callback) {
    console.log('hello ' + task.name);
    callback();
}, 2);

// assign a callback
q.drain = function() {
    console.log('all items have been processed');
};

// add some items to the queue
q.push({name: 'foo'}, function(err) {
    console.log('finished processing foo');
});

// quoted from async documentation