3
votes

everyone. I'm using BlockingCollection in the traditional producer-consumer scenario. To process items in the collection one by one, I have to write this code:

while (...)
{
  var item = collection.Take(cancellationTokenSource.Token);
  ProcessItem(item);
}

But how to process a batch of N items (wait until collection has less than N items)? My solution is using some temporary buffer:

var buffer = new List<MyType>(N);

while (...)
{
  var item = collection.Take(cancellationTokenSource.Token);

  buffer.Add(item);
  if (buffer.Count == N)
  {
    foreach (var item in items)
    {
      ProcessItem(item);
    }

    buffer.Clear();
  }
}

But it seems to me very ugly... Is there any better approach?

[UPDATE]: Here's extension method's prototype, which makes the solution more readable. Maybe, someone will find it useful:

public static class BlockingCollectionExtensions
{
    public static IEnumerable<T> TakeBuffer<T>(this BlockingCollection<T> collection,
        CancellationToken cancellationToken, Int32 bufferSize)
    {
        var buffer = new List<T>(bufferSize);

        while (buffer.Count < bufferSize)
        {
            try
            {
                buffer.Add(collection.Take(cancellationToken));
            }
            catch (OperationCanceledException)
            {
                // we need to handle the rest of buffer,
                // even if the task has been cancelled.
                break;
            }
        }

        return buffer;
    }
}

And usage:

foreach (var item in collection.TakeBuffer(cancellationTokenSource.Token, 5))
{
  // TODO: process items here...
}

Of course, this is not a complete solution: for example, I would add any timeout support - if there's not enough items, but time is elapsed, we need to stop waiting and process items already added to the buffer.

3

3 Answers

0
votes

I don't find that solution all that ugly. The batch processing is an orthogonal requirement to what the blocking collection does and should be treated as such. I would encapsulate the batch processing behaviour in a BatchProcessor class with a clean interface but other than that I don't really see a problem with that approach.

0
votes

You may find the lock-free implementation of a queue together with a blocking collection to be a premature optimization. You might be able to write cleaner code if you take a step back and use Queue with Monitor-based locks.

0
votes

First of all I'm not sure if your logic is correct. You say you want to wait until collection has less than N items - isn't it the other way around? You want the collection to have N or more items, in order to process N items. Or perhaps I'm misunderstanding.

Then I also suggest you process items one by one if there are less than N items, or you may find that your application seems to hang at N-1 items. Of course if this is a steady stream of data, processing only when buffer.Count >= N could be good enough.

I'd suggest going for a queue and Monitor like GregC says.

Something like this:

public object Dequeue() {   
  while (_queue.Count < N)   { 
    Monitor.Wait(_queue);   
  } 
 return _queue.Dequeue();
}

public void Enqueue( object q )
{
  lock (_queue)
 {
  _queue.Enqueue(q);
  if (_queue.Count == N)
  {
      // wake up any blocked dequeue call(s)
      Monitor.PulseAll(_queue);
  }
 }
}