2
votes

I am trying to implement the following Use Case. I have an Azure Worker Role that will monitor the Azure Storage Queue, and when a message comes in, this will trigger a job to run Asynchronously. I want to use the TPL if possible, and need the operations to support cancellation, so that when the Azure Role OnStop fires, jobs can exit gracefully if possible. The MyFixIt example posted by Scott Guthrie is almost exactly what I need, and I have used this as the template for my project. The one critical aspect not supported is the requirement to run the jobs asynchronously. In the FixIt code, once a job is launched, no other jobs will process until that one finishes. Some of the jobs my application will process are long running, and I need the worker role to be able to notice other incoming jobs and run those while the long running job is running.

The 2 key methods here are ProcessMessagesAsync, which monitors the queue, and ProcessMessage, which will run the job when a message comes in. Here is what I have, and it mostly works except it does not handle the CancellationRequest properly, and the Azure Worker Role will shut down without waiting for jobs to complete.

        /// <summary>
    /// Continuous loop that monitors the queue and launches jobs when they are retrieved.
    /// </summary>
    /// <param name="token"></param>
    /// <returns></returns>
    public virtual async Task ProcessMessagesAsync(CancellationToken token)
    {
        CloudQueue queue = _queueClient.GetQueueReference(_queueName);
        await queue.CreateIfNotExistsAsync(token);

        while (!token.IsCancellationRequested)
        {
            Debug.WriteLine("inLoop");
            // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
            // Pass in a cancellation token, because the operation can be long-running.
            CloudQueueMessage message = await queue.GetMessageAsync(token);
            if (message != null)
            {
                ProcessMessage(message, queue, token);
            }
            else
            {
                await Task.Delay(500, token);
            }
        }
    }



    protected virtual async Task ProcessMessage(CloudQueueMessage message, CloudQueue queue, CancellationToken token)
    {
        var jobDetails = JobDetails.DeserializeJson(message.AsString);
        var result = await _jobRunner.RunJob(jobDetails, token);

        //todo handle error
        //if (result.Status == JobStatus.Error)

        await queue.DeleteMessageAsync(message);
    }

Then the JobRunner runs the job requested. I have written a TestJob in which I am trying to simulate a long running job that can notice the CancellationRequest, and after a short cleanup period, exit the job early.

    public virtual async Task<JobResult> RunJob(JobDetails jobDetails, CancellationToken token)
    {
        switch (jobDetails.JobName.ToLower())
        {
            case "testjob":
                return await TestJob(jobDetails.Args, token);
        }
        return new JobResult(JobStatus.Error) { ErrorMessage = "The job requested does not exist." };
    }
    protected virtual async Task<JobResult> TestJob(List<string> jobArgs, CancellationToken token)
    {
        var message = "no message";
        if (jobArgs != null && jobArgs.Any())
            message = jobArgs[0];

        return await Task.Run(async () =>
        {
            Debug.WriteLine(string.Format("Start:{0}", message));
            for (int i = 1; i <= 800; i++)
            {
                if (token.IsCancellationRequested)
                {
                    Debug.WriteLine("CancelationRequest in TestJob");
                    //simulate short time to cleanup and exit early
                    Thread.Sleep(1500);
                    Debug.WriteLine("Cancelation Job Cleanup finsihed.");
                    token.ThrowIfCancellationRequested();
                }
                Thread.Sleep(10);
            }

            Debug.WriteLine(string.Format("Finish:{0}", message));
            return new JobResult(JobStatus.Success);
        });
    }

I have been searching and researching for 2 days now, including the TPL DataFlow library, and have not yet been able to come up with a way to make this work properly. I feel like the Call to ProcessMessage(message, queue, token) is not being done correctly, there even is a compiler warning 'Because this call is not awaited...'. But I DON'T want to await (which is what the FixIt example does), because then no other jobs get noticed until the running one is finished. This seems like it would not be an uncommon use case, though I cannot seem to find anyone describing it.

Thank you in advance for any help!

Danny Green

2
You really should look into WebJobs; they handle all of this for you. - Stephen Cleary

2 Answers

0
votes

The reason this is happening is because you are not honouring the task returned from ProcessMessage. Because of this ProcessMessageAsync can finish before ProcessMessage gracefully completes or cancels. Keeping in mind that you don't want to await ProcessMessage because it will make message processing sequential, I would suggest that you keep a list of running tasks. In other words, create a List in ProcessMessageAsync and add the task returned from ProcessMessage to this list. Then at the end of while loop you should loop through this list to cancel all pending tasks if token was cancelled.

Sorry I don't have VS handy but I hope you get the point.

0
votes

Thank you Sanjay, Based on your suggestion I have come up with the following.

      /// <summary>
    /// Continuous loop that monitors the queue and launches jobs when they are retrieved.
    /// </summary>
    /// <param name="token"></param>
    /// <returns></returns>
    public virtual async Task ProcessMessagesAsync(CancellationToken token)
    {
        CloudQueue queue = _queueClient.GetQueueReference(_queueName);
        await queue.CreateIfNotExistsAsync(token);

        var runningTasks = new ConcurrentDictionary<int, Task>();

        while (!token.IsCancellationRequested)
        {
            Debug.WriteLine("inLoop");
            // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
            // Pass in a cancellation token, because the operation can be long-running.
            CloudQueueMessage message = await queue.GetMessageAsync(token);
            if (message != null)
            {
                var t = ProcessMessage(message, queue, token);
                var c = t.ContinueWith(z => RemoveRunningTask(t.Id, runningTasks));
                while (true)
                {
                    if (runningTasks.TryAdd(t.Id, t))
                        break;
                    Task.Delay(25);
                }                                    
            }                    
            else
            {
                try
                {
                    await Task.Delay(500, token);
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
        }
        while (!runningTasks.IsEmpty)
        {
            Debug.WriteLine("Waiting for running tasks");
            Task.Delay(500);
        }

    }

    private static void RemoveRunningTask(int id, ConcurrentDictionary<int, Task> runningTasks)
    {
        while (true)
        {
            Task outTask;
            if (runningTasks.TryRemove(id, out outTask))
                break;
            Task.Delay(25);
        }

    }

This seems to work, though I feel it is a little clumsy. I started out coding the 'ContinueWith' like this, but was surprised that the incoming task had a different Id value (I expected it to be the same Task):

                    var task = ProcessMessage(message, queue, token).ContinueWith(x =>
                {
                    while (true)
                    {
                        Task outTask;
                        if (runningTasks.TryRemove(x.Id, out outTask))
                            break;
                        Task.Delay(25);
                    }

                });

UPDATE: It turns out that this still does not quite work, I somehow misread the results when testing earlier. Based on the MyFixIt example, in the Work Role OnStop I have the following code:

        public override void OnStop()
    {
        Debug.WriteLine("OnStop_Begin");
        tokenSource.Cancel();
        tokenSource.Token.WaitHandle.WaitOne();
        base.OnStop();
        Debug.WriteLine("Onstop_End");
        tokenSource.Dispose();
    }

It appears that the tokenSource.Token.WaitHandle.WaitOne isn't really able to wait until all of the tasks that have a reference to the token have finished, so the role continues and stops even when tasks are still in the processing of finishing up. Is there some way to properly use the token to signal when the cancellation is actually completed?

Thanks!

UPDATE 2

Ok, I think I have a solution that is now working. It appears that the CancellationToken.WaitHandle is signaled when the .Cancel is called, so I'm not sure what the purpose of having it immediately after the .Cancel is called, it seems like it would always just continue immediately through that code? This is how it is in the FixIt example, but I don't really understand it. For my purpose, I have changed ProcessMessagesAsync to now get passed in a ManualResetEventSlim, and then set that after all tasks have finished. Then in OnStop I wait on that before finishing the Stop.

       /// <summary>
    /// Continuous loop that monitors the queue and launches jobs when they are retrieved.
    /// </summary>
    /// <param name="token"></param>
    /// <returns></returns>
    public virtual async Task ProcessMessagesAsync(CancellationToken token, ManualResetEventSlim reset)
    {
        CloudQueue queue = _queueClient.GetQueueReference(_queueName);
        await queue.CreateIfNotExistsAsync(token);

        var runningTasks = new ConcurrentDictionary<int, Task>();

        while (!token.IsCancellationRequested)
        {
            Debug.WriteLine("inLoop");
            // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
            // Pass in a cancellation token, because the operation can be long-running.
            CloudQueueMessage message = await queue.GetMessageAsync(token);
            if (message != null)
            {
                var t = ProcessMessage(message, queue, token);
                var c = t.ContinueWith(z => RemoveRunningTask(t.Id, runningTasks));


                while (true)
                {
                    if (runningTasks.TryAdd(t.Id, t))
                        break;
                    await Task.Delay(25);
                }                                    
            }                    
            else
            {
                try
                {
                    await Task.Delay(500, token);
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
        }
        while (!runningTasks.IsEmpty)
        {
            Debug.WriteLine("Waiting for running tasks");
            await Task.Delay(500);
        }
        Debug.WriteLine("All tasks have finished, exiting ProcessMessagesAsync.");
        reset.Set();
    }
        public override void OnStop()
    {
        Debug.WriteLine("OnStop_Begin");
        tokenSource.Cancel();
        tokenSource.Token.WaitHandle.WaitOne();
        _reset.Wait();
        base.OnStop();
        Debug.WriteLine("Onstop_End");
        tokenSource.Dispose();
    }