4
votes

I have an unlimited number of tasks in a db queue somewhere. What is the best way to have a program working on n tasks simultaneously on n different threads, starting new tasks as old ones get done? When one task finishes, another task should asynchronously begin. The currently-running count should always be n.

My initial thought was to use a thread pool, but that seems unnecessary considering that the tasks to be worked on will be retrieved within the individual threads. In other words, each thread will on its own go get its next task rather than having a main thread get tasks and then distribute them.

I see multiple options for doing this, and I don't know which one I should use for optimal performance.

1) Thread Pool - In light of there not necessarily being any waiting threads, I'm not sure this is necessary.

2) Semaphore - Same as 1. What's the benefit of a semaphore if there aren't tasks waiting to be allocated by the main thread?

3) Same Threads Forever - Kick the program off with n threads. When a thread is done working, it gets the next task itself. The main thread just monitors to makes sure the n threads are still alive.

4) Event Handling - Same as 3, except that when a thread finishes a task, it fires off an ImFinished event before dying. An ImFinished event handler kicks off a new thread. This seems just like 3 but with more overhead (since new threads are constantly being created)

5) Something else?

4

4 Answers

4
votes

BlockingCollection makes this whole thing pretty trivial:

var queue = new BlockingCollection<Action>();

int numWorkers = 5;

for (int i = 0; i < numWorkers; i++)
{
    Thread t = new Thread(() =>
    {
        foreach (var action in queue.GetConsumingEnumerable())
        {
            action();
        }
    });
    t.Start();
}

You can then have the main thread add items to the blocking collection after starting the workers (or before, if you want). You can even spawn multiple producer threads to add items to the queue.

Note that the more conventional approach would be to use Tasks instead of using Thread classes directly. The primary reasons that I didn't suggest it first is that you specifically requested an exact number of threads to be running (rather than a maximum) and you just don't have as much control over how Task objects are run (which is good; they can be optimized on your behalf). If that control isn't as important as you have stated the following may end up being preferable:

var queue = new BlockingCollection<Action>();

int numWorkers = 5;

for (int i = 0; i < numWorkers; i++)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var action in queue.GetConsumingEnumerable())
        {
            action();
        }
    }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
0
votes

I like model #3, and have used it before; it reduces the number of threads starting and stopping, and makes the main thread a true "supervisor", reducing the work it has to do.

As Servy has indicated, the System.Collections.Concurrent namespace has a few constructs that are extremely valuable here. ConcurrentQueue is a thread-safe FIFO collection implementation designed to be used in just such a model; one or more "producer" threads add elements to the "input" side of the queue, while one or more "consumers" take elements out of the other end. If there is nothing in the queue, the call to get the item simply returns false; you can react to that by exiting out of the task method (the supervisor can then decide whether to start another task, probably by monitoring the input to the queue and ramping up when more items come in).

BlockingCollection adds the behavior of causing threads to wait when they attempt to get a value from the queue, if the queue doesn't have anything. It can also be configured to have a maximum capacity, above which it will block the "producer" threads adding any more elements until there is available capacity. BlockingCollection uses a ConcurrentQueue by default, but you can set it up to be a Stack, Dictionary or Bag if you wish. Using this model, you can have the tasks run indefinitely; when there's nothing to do they'll simply block until there is something for at least one of them to work on, so all the supervisor has to check for is tasks erroring out (a critical element of any robust threaded workflow pattern).

0
votes

This is easily achieved with the TPL Dataflow library.

First, let's assume you have a BufferBlock<T>, this is your queue:

var queue = new BufferBlock<T>();

Then, you need the action to perform on the block, this is represented by the ActionBlock<T> class:

var action = new ActionBlock<T>(t => { /* Process t here */ },
    new ExecutionDataflowBlockOptions {
        // Number of concurrent tasks.
        MaxDegreeOfParallelism = ..., 
    });

Note the constructor above, it takes an instance of ExecutionDataflowBlockOptions and sets the MaxDegreeOfParallelism property to however many concurrent items you want to be processed at the same time.

Underneath the surface, the Task Parallel Library is being used to handle allocating threads for tasks, etc. TPL Dataflow is meant to be a higher level abstraction which allows you to tweak just how much parallelism/throttling/etc that you want.

For example, if you didn't want the ActionBlock<TInput> to buffer any items (preferring them to live in the BufferBlock<T>), you can also set the BoundedCapacity property, which will limit the number of items that the ActionBlock<TInput> will hold onto at once (which includes the number of items being processed, as well as reserved items):

var action = new ActionBlock<T>(t => { /* Process t here */ },
    new ExecutionDataflowBlockOptions {
        // Number of concurrent tasks.
        MaxDegreeOfParallelism = ..., 

        // Set to MaxDegreeOfParallelism to not buffer.
        BoundedCapacity ..., 
    });

Also, if you want a new, fresh Task<TResult> instance to process every item, then you can set the MaxMessagesPerTask property to one, indicating that each and every Task<TResult> will process one item:

var action = new ActionBlock<T>(t => { /* Process t here */ },
    new ExecutionDataflowBlockOptions {
        // Number of concurrent tasks.
        MaxDegreeOfParallelism = ..., 

        // Set to MaxDegreeOfParallelism to not buffer.
        BoundedCapacity ..., 

        // Process once item per task.
        MaxMessagesPerTask = 1,
    });

Note that depending on how many other tasks your application is running, this might or might not be optimal for you, and you might also want to think of the cost of spinning up a new task for every item that comes through the ActionBlock<TInput>.

From there, it's a simple matter of linking the BufferBlock<T> to the ActionBlock<TInput> with a call to the LinkTo method:

IDisposable connection = queue.LinkTo(action, new DataflowLinkOptions {
    PropagateCompletion = true;
});

You set the PropogateCompletion property to true here so that when waiting on the ActionBlock<T>, the completion will be sent to the ActionBlock<T> (if/when there are no more items to process) which you might subsequently wait on.

Note the you can call the Dispose method on the IDisposable interface implementation returned from the call to LinkTo if you want the link between the blocks to be removed.

Finally, you post items to the buffer using the Post method:

queue.Post(new T());

And when you're done (if you are ever done), you call the Complete method:

queue.Complete();

Then, on the action block, you can wait until it's done by waiting on the Task instance exposed by the Completion property:

action.Completion.Wait();

Hopefully, the elegance of this is clear:

  • You don't have to manage the creation of new Task instances/threads/etc to manage the work, the blocks do it for you based on the settings you provide (and this is on a per-block basis).
  • Cleaner separation of concerns. The buffer is separated from the action, as are all the other blocks. You build the blocks and then link them together.
-1
votes

I'm a VB guy, but you can easily translate:

Private Async Sub foo()

    Dim n As Integer = 16
    Dim l As New List(Of Task)
    Dim jobs As New Queue(Of Integer)(Enumerable.Range(1, 100))

    For i = 1 To n
        Dim j = jobs.Dequeue
        l.Add(Task.Run((Sub()
                            Threading.Thread.Sleep(500)
                            Console.WriteLine(j)
                        End Sub)))
    Next

    While l.Count > 0
        Dim t = Await Task.WhenAny(l)
        If jobs.Count > 0 Then
            Dim j = jobs.Dequeue
            l(l.IndexOf(t)) = (Task.Run((Sub()
                                             Threading.Thread.Sleep(500)
                                             Console.WriteLine(j)
                                         End Sub)))
        Else
            l.Remove(t)
        End If
    End While

End Sub

There's an article from Stephen Toub, why you shouldn't use Task.WhenAny in this way ... WITH A LARGE LIST OF TASKS, but with "some" tasks you usually dont run into a problem

The idea is quite simple: You have a list, where you add as many (running) tasks as you would like to run in parallel. Then you (a)wait for the first one to finish. If there are still jobs in the queue, you assign the job to a new task and then (a)wait again. If there are no jobs in the queue, you simply remove the finished task. If both your tasklist and the queue is empty, you are done.

The Stephen Toub article: http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx