This is easily achieved with the TPL Dataflow library.
First, let's assume you have a BufferBlock<T>
, this is your queue:
var queue = new BufferBlock<T>();
Then, you need the action to perform on the block, this is represented by the ActionBlock<T>
class:
var action = new ActionBlock<T>(t => { /* Process t here */ },
new ExecutionDataflowBlockOptions {
// Number of concurrent tasks.
MaxDegreeOfParallelism = ...,
});
Note the constructor above, it takes an instance of ExecutionDataflowBlockOptions
and sets the MaxDegreeOfParallelism
property to however many concurrent items you want to be processed at the same time.
Underneath the surface, the Task Parallel Library is being used to handle allocating threads for tasks, etc. TPL Dataflow is meant to be a higher level abstraction which allows you to tweak just how much parallelism/throttling/etc that you want.
For example, if you didn't want the ActionBlock<TInput>
to buffer any items (preferring them to live in the BufferBlock<T>
), you can also set the BoundedCapacity
property, which will limit the number of items that the ActionBlock<TInput>
will hold onto at once (which includes the number of items being processed, as well as reserved items):
var action = new ActionBlock<T>(t => { /* Process t here */ },
new ExecutionDataflowBlockOptions {
// Number of concurrent tasks.
MaxDegreeOfParallelism = ...,
// Set to MaxDegreeOfParallelism to not buffer.
BoundedCapacity ...,
});
Also, if you want a new, fresh Task<TResult>
instance to process every item, then you can set the MaxMessagesPerTask
property to one, indicating that each and every Task<TResult>
will process one item:
var action = new ActionBlock<T>(t => { /* Process t here */ },
new ExecutionDataflowBlockOptions {
// Number of concurrent tasks.
MaxDegreeOfParallelism = ...,
// Set to MaxDegreeOfParallelism to not buffer.
BoundedCapacity ...,
// Process once item per task.
MaxMessagesPerTask = 1,
});
Note that depending on how many other tasks your application is running, this might or might not be optimal for you, and you might also want to think of the cost of spinning up a new task for every item that comes through the ActionBlock<TInput>
.
From there, it's a simple matter of linking the BufferBlock<T>
to the ActionBlock<TInput>
with a call to the LinkTo
method:
IDisposable connection = queue.LinkTo(action, new DataflowLinkOptions {
PropagateCompletion = true;
});
You set the PropogateCompletion
property to true here so that when waiting on the ActionBlock<T>
, the completion will be sent to the ActionBlock<T>
(if/when there are no more items to process) which you might subsequently wait on.
Note the you can call the Dispose
method on the IDisposable
interface implementation returned from the call to LinkTo
if you want the link between the blocks to be removed.
Finally, you post items to the buffer using the Post
method:
queue.Post(new T());
And when you're done (if you are ever done), you call the Complete
method:
queue.Complete();
Then, on the action block, you can wait until it's done by waiting on the Task
instance exposed by the Completion
property:
action.Completion.Wait();
Hopefully, the elegance of this is clear:
- You don't have to manage the creation of new
Task
instances/threads/etc to manage the work, the blocks do it for you based on the settings you provide (and this is on a per-block basis).
- Cleaner separation of concerns. The buffer is separated from the action, as are all the other blocks. You build the blocks and then link them together.