3
votes

Trying to poll an Azure Service Bus Queue using a WebJob written in Node.js. I created 2 WebJobs. The first is on demand and sends 10 unique messages to the queue. The second job is continuous and polls the queue for messages.

Encountering the following issues:

  1. The polling is SLOW. It takes an average of about 10 minutes to receive 10 messages. See sample log details below. Basically unusable at this speed. All the delay is from getting a response from receiveQueueMessage. Response times vary from 0 seconds to ~120 seconds, with an average of 60 seconds.

  2. The messages are being received in a random order. Not FIFO.

  3. Sometimes messages are received twice, even though they are being read in ReceiveAndDelete mode (I have tried with no read mode parameter which should default to ReceiveAndDelete, with {isReceiveAndDelete:true} and with {isPeekLock:false} with the same results).

  4. When the queue is empty, it should keep the receive request open for a day, but it always returns with a no message error after 230 seconds. According to the documentation the max is 24 days so I don't know where 230 seconds is coming from:

The maximum timeout for a blocking receive operation in Service Bus queues is 24 days. However, REST-based timeouts have a maximum value of 55 seconds.

Basically nothing works as advertised. What am I doing wrong?

Send Message Test Job:

var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
var messagesToSend = 10;

sendMessage(0);

function sendMessage(count)
{
    var message = {
        body: 'test message',
        customProperties: {
            message_number: count,
            sent_date: new Date
        },
        brokerProperties: {
            MessageId: uuid.v4() //ensure that service bus doesn't think this is a duplicate message
        }
    };

    serviceBus.sendQueueMessage(process.env.busSearchQueueName, message, function(err) {

        if (!err) {
            console.log('sent test message number ' + count.toString());
        } else {
            console.error('error sending message: ' + err);
        }

    });

    //wait 5 seconds to ensure messages are received by service bus in correct order
    if (count < messagesToSend) {
        setTimeout(function(newCount) {
            //send next message
            sendMessage(newCount);
        }, 5000, count+1);
    }
}    

Receive Message Continuous Job:

console.log('listener job started');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
listenForMessages(serviceBus);

function listenForMessages(serviceBus)
{
    var start = process.hrtime();
    var timeOut = 60*60*24; //long poll for 1 day
    serviceBus.receiveQueueMessage(process.env.busSearchQueueName, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) {

        var end = process.hrtime(start);
        console.log('received a response in %ds seconds', end[0]);

        if (err) {

            console.log('error requesting message: ' + err);
            listenForMessages(serviceBus);

        } else {

            if (message !== null && typeof message === 'object' && 'customProperties' in message && 'message_number' in message.customProperties) {

                console.log('received test message number ' + message.customProperties.message_number.toString());
                listenForMessages(serviceBus);

            } else {

                console.log('invalid message received');
                listenForMessages(serviceBus);

            }

        }

    });
}

Sample Log Output:

[05/06/2015 21:50:14 > 8c2504: SYS INFO] Status changed to Running
[05/06/2015 21:50:14 > 8c2504: INFO] listener job started
[05/06/2015 21:51:23 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:23 > 8c2504: INFO] received test message number 0
[05/06/2015 21:51:25 > 8c2504: INFO] received a response in 2s seconds
[05/06/2015 21:51:26 > 8c2504: INFO] received test message number 4
[05/06/2015 21:51:27 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:27 > 8c2504: INFO] received test message number 7
[05/06/2015 21:51:28 > 8c2504: INFO] received a response in 0s seconds
[05/06/2015 21:51:29 > 8c2504: INFO] received test message number 9
[05/06/2015 21:51:49 > 8c2504: INFO] received a response in 20s seconds
[05/06/2015 21:51:49 > 8c2504: INFO] received test message number 1
[05/06/2015 21:53:35 > 8c2504: INFO] received a response in 106s seconds
[05/06/2015 21:53:35 > 8c2504: INFO] received test message number 1
[05/06/2015 21:54:26 > 8c2504: INFO] received a response in 50s seconds
[05/06/2015 21:54:26 > 8c2504: INFO] received test message number 5
[05/06/2015 21:54:35 > 8c2504: INFO] received a response in 9s seconds
[05/06/2015 21:54:35 > 8c2504: INFO] received test message number 9
[05/06/2015 21:55:28 > 8c2504: INFO] received a response in 53s seconds
[05/06/2015 21:55:28 > 8c2504: INFO] received test message number 2
[05/06/2015 21:57:26 > 8c2504: INFO] received a response in 118s seconds
[05/06/2015 21:57:26 > 8c2504: INFO] received test message number 6
[05/06/2015 21:58:28 > 8c2504: INFO] received a response in 61s seconds
[05/06/2015 21:58:28 > 8c2504: INFO] received test message number 8
[05/06/2015 22:00:35 > 8c2504: INFO] received a response in 126s seconds
[05/06/2015 22:00:35 > 8c2504: INFO] received test message number 3
[05/06/2015 22:04:25 > 8c2504: INFO] received a response in 230s seconds
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive
[05/06/2015 22:08:16 > 8c2504: INFO] received a response in 230s seconds    
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive
3

3 Answers

4
votes

And the issue was the queue I was using was partitioned (the default option when creating a queue in the Azure portal). Once I created a new queue that was not partitioned, everything worked as expected without the lag (other than the weird 230 second timeout on a long poll attempt). So basically the node.js library doesn't work for partitioned queues. At all. Wasted many days figuring that one out. Will leave this here for others.

1
votes

Switching off the partitioned flag of the Service Bus queue worked for me, too.

With the partitioned queue some messages had delays of more than 30 minutes. A simple DotNet webclient could download all messages without any delays. However, as soon as nodejs was supposed to download messages, only the first message would be downloaded without problems, afterwards delays showed up. Playing with nodejs to change the http agent options keepalive and socket timeout did not improve the situation.

After stopping nodejs, I had to wait several minutes before the DotNet client actually started working without problem. This was reproducable several times. I also found the simple DotNet webclient program showed similar problems, after being started and stopped several times in a row.

Anyway, your post showed me the solution: Turn off the partitioned flag :)

1
votes

Try using the amqp to read the messages off the azure service bus partitioned queue and this will work for a partitioned topic/queue and you don't even have to poll a lot.

const AMQPClient = require('amqp10').Client;
const Policy = require('amqp10').Policy;

const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'your_key_goes_here';
const serviceBusHost = 'namespace.servicebus.windows.net';
const uri = `${protocol}://${encodeURIComponent(keyName)}:${encodeURIComponent(sasKey)}@${serviceBusHost}`;
const queueName = 'partitionedQueueName';
const client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(() => Promise.all([client.createReceiver(queueName)]))
.spread((receiver) => {
    console.log('--------------------------------------------------------------------------');
    receiver.on('errorReceived', (err) => {
        // check for errors
        console.log(err);
    });
    receiver.on('message', (message) => {
        console.log('Received message');
        console.log(message);
        console.log('----------------------------------------------------------------------------');
    });
})
.error((e) => {
    console.warn('connection error: ', e);
});

https://www.npmjs.com/package/amqp10