1
votes

I have tried implementing my own QueueProcessorFactory, and it works fine, besides one thing I can’t wrap my head around. After I have tried a message for 5 times (default) it runs CopyMessageToPoisonQueueAsync and then DeleteMessageAsync.

So far so good, but 10 minutes later the message appears in the queue again with a dequeue count 5, and it is in the poison queue too, and then the same procedure, CopyMessageToPoisonQueueAsync, DeleteMessageAsync, an extra item in posionqueue exactly the same as the one already copied and 10 min later the same procedure but with a dequeue count 6. Should I change the ExpirationTime and set it to now when deleting, or do I miss something else?

This is my code:

class Program
{
    static void Main()
    {

        var config = new JobHostConfiguration();
        config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(3);
        config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();

        var host = new JobHost(config);
        host.RunAndBlock();
    }
}

public class CustomQueueProcessorFactory : IQueueProcessorFactory
{
    public List<CustomQueueProcessor> CustomQueueProcessors = new List<CustomQueueProcessor>();
    public QueueProcessor Create(QueueProcessorFactoryContext context)
    {
        CustomQueueProcessor processor = new CustomQueueProcessor(context);
        CustomQueueProcessors.Add(processor);
        return processor;
    }

    public class CustomQueueProcessor : QueueProcessor
    {
        public CustomQueueProcessor(QueueProcessorFactoryContext context)
            : base(context)
        {
        }
        public override Task<bool> BeginProcessingMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
        {
            return base.BeginProcessingMessageAsync(message, cancellationToken);
        }
        public override Task CompleteProcessingMessageAsync(CloudQueueMessage message, FunctionResult result, CancellationToken cancellationToken)
        {
            return base.CompleteProcessingMessageAsync(message, result, cancellationToken);
        }

        protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
        {
            return base.CopyMessageToPoisonQueueAsync(message, poisonQueue, cancellationToken);
        }

        protected override Task DeleteMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
        {
            return base.DeleteMessageAsync(message, cancellationToken);
        }
        protected override async Task ReleaseMessageAsync(CloudQueueMessage message, FunctionResult result, TimeSpan visibilityTimeout, CancellationToken cancellationToken)
        {
            visibilityTimeout = TimeSpan.FromSeconds(2);
            await base.ReleaseMessageAsync(message, result, visibilityTimeout, cancellationToken);
        }
    }
}

if I add some ConsoleWritelines i get this output:

Start omitted because of repetition . . .

BeginProcessingMessageAsync message: d3c88182-ff39-4f81-8c29-b4ce0b2062ad dequeue count: 5 Date: 2017-06-26 13:33:42 Executing 'Functions.ProcessQueueMessage' (Reason='New queue message detected on '01testqueue'.', Id=17405a55-6d28-48b2-a874-718c0b741f61) Test QueueProcessorFactory Exception while executing function: Functions.ProcessQueueMessage Microsoft.Azure.WebJobs.Host.FunctionInvocationException: Exception while executing function: Functions.ProcessQueueMessage ---> System.Exception: Derp! at WebJobTest1.Functions.ProcessQueueMessage(String message, TextWriter log)

...message omitted....

CompleteProcessingMessageAsync message: d3c88182-ff39-4f81-8c29-b4ce0b2062ad dequeue count: 5 CopyMessageToPoisonQueueAsync message: d3c88182-ff39-4f81-8c29-b4ce0b2062ad dequeue count: 5 Message has reached MaxDequeueCount of 5. Moving message to queue '01testqueue-poison'. CopyMessageToPoisonQueueAsync message: da643007-954a-4296-9e9a-54ebb0aec6c5 dequeue count: 5 Ten minute waiting time: BeginProcessingMessageAsync message: d3c88182-ff39-4f81-8c29-b4ce0b2062ad dequeue count: 6 Date: 2017-06-26 13:43:46 Executing 'Functions.ProcessQueueMessage' (Reason='New queue message detected on '01testqueue'.', Id=c22fe457-cb70-4cc8-a8a4-550cd44a8345) Test QueueProcessorFactory Exception while executing function: Functions.ProcessQueueMessage CompleteProcessingMessageAsync message: d3c88182-ff39-4f81-8c29-b4ce0b2062ad dequeue count: 6 CopyMessageToPoisonQueueAsync message: d3c88182-ff39-4f81-8c29-b4ce0b2062ad dequeue count: 6 Message has reached MaxDequeueCount of 5. Moving message to queue '01testqueue-poison'. CopyMessageToPoisonQueueAsync message: c43fd9fb-c2d7-4745-91ae-33cdc407ede6 dequeue count: 6

1

1 Answers

1
votes

According to your description, I assumed that it dues to the known issue when using Storage SDK 8.x with WebJobs SDK. Here are the similar issues:

Per my test, this issue has not been fixed for now. You could downgrade the Storage SDK or change CopyMessageToPoisonQueueAsync as follows:

protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
{
    var newMessage = new CloudQueueMessage(message.Id, message.PopReceipt);
    newMessage.SetMessageContent(message.AsBytes);
    return base.CopyMessageToPoisonQueueAsync(newMessage, poisonQueue, cancellationToken);
}