everyone. I'm using BlockingCollection in the traditional producer-consumer scenario. To process items in the collection one by one, I have to write this code:
while (...)
{
var item = collection.Take(cancellationTokenSource.Token);
ProcessItem(item);
}
But how to process a batch of N items (wait until collection has less than N items)? My solution is using some temporary buffer:
var buffer = new List<MyType>(N);
while (...)
{
var item = collection.Take(cancellationTokenSource.Token);
buffer.Add(item);
if (buffer.Count == N)
{
foreach (var item in items)
{
ProcessItem(item);
}
buffer.Clear();
}
}
But it seems to me very ugly... Is there any better approach?
[UPDATE]: Here's extension method's prototype, which makes the solution more readable. Maybe, someone will find it useful:
public static class BlockingCollectionExtensions
{
public static IEnumerable<T> TakeBuffer<T>(this BlockingCollection<T> collection,
CancellationToken cancellationToken, Int32 bufferSize)
{
var buffer = new List<T>(bufferSize);
while (buffer.Count < bufferSize)
{
try
{
buffer.Add(collection.Take(cancellationToken));
}
catch (OperationCanceledException)
{
// we need to handle the rest of buffer,
// even if the task has been cancelled.
break;
}
}
return buffer;
}
}
And usage:
foreach (var item in collection.TakeBuffer(cancellationTokenSource.Token, 5))
{
// TODO: process items here...
}
Of course, this is not a complete solution: for example, I would add any timeout support - if there's not enough items, but time is elapsed, we need to stop waiting and process items already added to the buffer.