2
votes

I am using Azure Functions with a queue trigger.

When the function runs, it takes an item off the Azure Queue. However, sometimes my processing will fail and in that case, I want to requeue the item and have it processed later.

How can one achieve that in Azure Functions and using Node.js?

2
Have you tried just throwing an exception?Henry Been
@HenryBeen Uhm, haven't. If I throw an exception, the function keeps running until the 5 minutes time limit. I'll try, thanks.user6269864
The function will try to process the message for 5 times as default and moves the item to poison queue. It will be removed from the original queue. if you want to control the number of times the function should reprocess, you can set the maxDequeueCount property in host.json. docs.microsoft.com/en-us/azure/azure-functions/…Baskar Rao
I would love to have that functionality. Say we process a queue message, if fails, put it back to queue with invisibility for a few hours. Once it shows up again, try to process again. At some point if dequeue point reaches say 5, report success and queue message will be deleted. Grr I wish this was available. We need a way to control if message can be put back to queue.dotsa
@dotsa this is already possible. I've posted a code sample belowuser6269864

2 Answers

2
votes

The workaround I did was to create a bind to the same queue that triggers the function.

 [FunctionName("FunctionsQueueTrigger")]
    public static async Task Run(
        [QueueTrigger("101functionsqueue")] string myQueueItem,
        [Queue("101functionsqueue")]  IAsyncCollector<string> myQueue)

In my case, I wanted to put the item back to the queue if a storage exception was thrown with the code 412 (Precondition Failed).

catch (StorageException ex)
        {
            if (ex.RequestInformation.HttpStatusCode == (int)System.Net.HttpStatusCode.PreconditionFailed) //412
            {
                log.LogWarning($"Putting item back to queue due to {ex.Message} error.");
                await myQueue.AddAsync(myQueueItem);
            }
        }

In the end, the function removes the current item from the queue, but a new one with the same content is added.

It may be not the best solution but worked fine for me.

-1
votes

In case someone is looking for an answer later, here's a code sample that uses the imago-azure-storage package:

const { Queues } = require('imago-azure-storage');
const q = new Queues(storageAccount, storageKey);

const QUEUE_NAME = 'testqueue';

(async() => {
    // Create the queue if it does not exist yet:
    await q.initializeQueues([QUEUE_NAME]);

    // Write a sample item to the queue:
    await q.put(QUEUE_NAME, { test: 12345 });

    // Retrieve up to 10 items, and if any of them fail,
    // retry processing them after 3600 seconds:
    let items = await q.fetch(QUEUE_NAME, 10, 3600);
    for (const { item, message } of items) {
        try {
          console.log(item);
          await doSomeWork(item); // <-- your function here

          // Items processed successfully, delete it from queue:
          await q.commit(message);
        } catch (error) {
          // Ignore errors, the item will be requeued automatically
          // in 3600 seconds.
        }
    }
})();