1
votes

I've got a queue-triggered Azure webjob function:

public void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
    {
       // Do normal processing...
    }

I want to selectively ignore certain queue items based on a condition. For example if I'm out of Widget X and I don't want to keep processing widget shipments until I have stock. So I want to ignore/skip all Widget X queue items temporarily. So something like:

public void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
{
    if (!HaveStock(WidgetX))
       // Ignore queue item - treat it as if it was not in the queue at all

    // Do normal processing...
}

Throwing an exception to fail processing doesn't seem like an option to me because it'll poison the message.

I could set the VisibilityTimeout to some arbitrarily high value and then throw an exception but then when I wanted to perform the processing (as opposed to just waiting for the timeout) I'd have to have a separate process scan for queue items with the higher VisibilityTimeout and set the timeout to a lower value (not even sure if this is possible). But making a process to scan those queue items is what I'm trying to avoid in the first place as I'd like to keep the logic in the queue-triggered function if possible.

What options do I have?

Update: I recognize that what I'm trying to accomplish is inconsistent with the purpose of queue-triggered functions in the first place given that the reason for queue-triggered events is to do something when a message appears, and that if I want selective processing I could implement a queue-polling and message handling process myself. But I'm trying to take advantage of the higher level of abstraction that triggered processing provides.

1

1 Answers

4
votes

I would like to say; Let Queue be Queue, and let FIFO be FIFO.

If you want to handle conditional message processing with queue, I suggest two options.

1. Use Azure Service Bus Topics

For selective FIFO filter, replacing Azure Queue with Azure Service Bus Topic can be a candidate.

See the tutorial to get a hint: https://azure.microsoft.com/en-us/documentation/articles/websites-dotnet-webjobs-sdk-service-bus/#topics

2. Use secondary Queue

Similar to poisoned-queue, send ignored one to secondary Queue and process them later in another logic. I recommend this as we can understand flows easily.

CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
_secondaryQueue= queueClient.GetQueueReference(storageQueueName);
_secondaryQueue.CreateIfNotExists();

public static void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
{
    // send to secondary queue.
    await _secondaryQueue.AddMessageAsync(new CloudQueueMessage(JsonConvert.SerializeObject(item)));
}