3
votes

I've got queue-triggered functions in my Azure webjobs. Normal behavior of course is when the function fails MaxDequeueCount times the message is put into the appropriate poison queue. I would like to modify the message after the error but before poison queue insertion. Example:

Initial message:

{ "Name":"Tom", "Age", 30" }

And upon failure I want to modify the message as follows and have the modified message be inserted into the poison queue:

{ "Name":"Tom", "Age", 30", "ErrorMessage":"Unable to find user" }

Can this be done?

2

2 Answers

1
votes

According to the Webjobs documentation, messages will get put on the poison queue after 5 failed attempts to process the message:

The SDK will call a function up to 5 times to process a queue message. If the fifth try fails, the message is moved to a poison queue. The maximum number of retries is configurable.

Source: https://github.com/Azure/azure-webjobs-sdk/wiki/Queues#poison

This is the automatic behavior. But you can still handle exceptions in your WebJobs Function code (so the exception doesn't leave your function and automatic poison message handling is not triggered) and put a modified message to the poison queue using output bindings.

Another option would be to check the dequeueCount property which indicates how many times the message was tried to be processed.

You can get the number of times a message has been picked up for processing by adding an int parameter named dequeueCount to your function. You can then check the dequeue count in function code and perform your own poison message handling when the number exceeds a threshold, as shown in the following example.

public static void CopyBlob(
        [QueueTrigger("copyblobqueue")] string blobName, int dequeueCount,
        [Blob("textblobs/{queueTrigger}", FileAccess.Read)] Stream blobInput,
        [Blob("textblobs/{queueTrigger}-new", FileAccess.Write)] Stream blobOutput,
        TextWriter logger)
    {
        if (dequeueCount > 3)
        {
            logger.WriteLine("Failed to copy blob, name=" + blobName);
        }
        else
        {
        blobInput.CopyTo(blobOutput, 4096);
        }
    }

(also taken from above link).

Your function signature could look like this

public static void ProcessQueueMessage(
            [QueueTrigger("myqueue")] CloudQueueMessage message,
            [Queue("myqueue-poison")] CloudQueueMessage poisonMessage,
            TextWriter logger)
0
votes

The default maximum retry time is 5. you also can set this value by yourself using the property Queues.MaxDequeueCount of the JobHostConfiguration() instance, code like below:

static void Main(string[] args)
{
    var config = new JobHostConfiguration();            
    config.Queues.MaxDequeueCount = 5; // set the maximum retry time
    var host = new JobHost(config);
    host.RunAndBlock();
} 

Then you can update the failed queue message when the maximum retry time have reached. You can specify a non-existing Blob container to enforce the retry mechanism. Code like below:

public static void ProcessQueueMessage([QueueTrigger("queue")] CloudQueueMessage message, [Blob("container/{queueTrigger}", FileAccess.Read)] Stream myBlob, ILogger logger)
 {
       string yourUpdatedString = "ErrorMessage" + ":" + "Unable to find user";
       string str1 = message.AsString;
       if (message.DequeueCount == 5) // here, the maximum retry time is set to 5
       {
                message.SetMessageContent(str1.Replace("}", "," + yourUpdatedString + "}"));   // modify the failed message here
       }
       logger.LogInformation($"Blob name:{message} \n Size: {myBlob.Length} bytes");
 }

When the above is done, you can see the updated queue message in the queue-poison.

UPDATED:

Since CloudQueueMessage is a sealed class, we cannot inherit it.

For your MySpecialPoco message, you can use JsonConvert.SerializeObject(message), code like below:

using  Newtonsoft.Json;

static int number = 0;
 public static void ProcessQueueMessage([QueueTrigger("queue")] object message, [Blob("container/{queueTrigger}", FileAccess.Read)] Stream myBlob, ILogger logger)
        {
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString"));    
            CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();                
            CloudQueue queue = queueClient.GetQueueReference("queue-poison");// get the poison queue
            CloudQueueMessage msg1 = new CloudQueueMessage(JsonConvert.SerializeObject(message));
            number++;
            string yourUpdatedString = "\"ErrorMessage\"" + ":" + "\"Unable to find user\"";
            string str1 = msg1.AsString;

            if (number == 5)
            {    
                msg1.SetMessageContent(str1.Replace("}", "," + yourUpdatedString + "}"));
                queue.AddMessage(msg1);                  
                number = 0;                   
            }                
            logger.LogInformation($"Blob name:{message} \n Size: {myBlob.Length} bytes");
        }

But the bad thing is that, both the original / updated queue messages are written into poison queue.