1
votes

I have a nodejs Azure Function reading for a Storage Queue which is being filled once a day. When all messages have been processed and the results are stored in a table I'd like to call a final function doing some work with the gathered data.

What's the best way to achieve this? Can I get the remaining messages in the queue inside my nodejs bindings? Do I have to peek the queue through the nodejs azure sdk? Can I trigger a different function when the queue runs empty?

2
I have seen this one - so I'd have to add the sdk in addition to the queue bindings? It seems counterintuitive to basically double the requests to the queue just to check if its empty.Thomas
Can you send "end" message as the last one in batch?Mikhail Shilkov
no, unfortunately I'm not able to tell which one will be the last.Thomas
You can have three queues. First queue will have message as BatchUniqueGuid-BatchCount. Second queue will have actual messages. You can read from first queue the batch count. Once all messages are processed from second queue add a message to third queue as batch complete. Can have another function monitor the third queue and do the work on gathered data. This can be a option.Baskar Rao

2 Answers

1
votes

This is a classic "join" problem. There are a few ways to handle this:

  1. Try the new Durable Function preview ( https://azure.github.io/azure-functions-durable-extension/ ). It's exactly for this situation.
  2. At the end of each worker queue message, figure out if it's the "last" one. Ie, if you're writing unique rows to a table, just scan the table and see if it's complete. (this is obviously not feasible if it's a large table).
  3. Maintain a counter. Each queue message decrements the counter. The guy that decrements to 0 is the "last". This can be kind of a pain since there's not a great way to atomically decrement a shared counter. You could either do some etag magic, or degenerate this into case #2 by having each worker write a row to a table.
1
votes

The problem for your business scenario is how the function knows, that the received message is the last one from the queue. There is no direct property/method to obtain this value such as a number of visible messages in the queue. Your question is interesting, you ask about the "Trigger on empty". It not easy to implemented this kind of event in the queue comparing to the "Trigger on message".

However, for distributed event-driven architecture (serverless architecture) can help a pattern using a distributed Watchdog, when each business processor (function/microservice/etc.) can fire a Watchdog Event such as a ScheduleMessage, for instance, 30 seconds. Within this time, this Watchdog Event can be canceled or retriggered, otherwise the Watchdog Event Message will be visible in the watchdog queue/topic entity.

Watchdog usage for your case is very straightforward. Your QueueTrigger Function is sending a Watchdog Event message to indicate a business processing. If this business process has been finished, the Watchdog is not any more retriggered and it will expire after its configured time (30s), so the Watchdog Event message is visible for its reader/subscriber.

The following screen snippets shows this model:

watchdog

As you can see, the Watchdog is based on the Azure Service Bus Schedule/CancelScheduled messages. There is no something like RetriggerScheduledMessage, basically the retriggering scheduled message in the Service Bus Entity is done in the two steps such as CancelScheduledMessage based on its sequence number and create new one using a ScheduleMessage call (new sequence number). Handling these steps is done in the QueueTrigger Function calls as Watchdog with a Lease Blob for keeping a counter and sequence number of the ScheduleMessage.

It will be nice, if the Azure Service Bus Entity Queue/Topic has built-in this kind of Watchdog with a capability to retrigger already scheduled message, so we can keep the same sequence number for all business process/task.

Anyway, if you are interesting about this Watchdog Function, I can update my answer with its implementation in C#.