15
votes

I've come up with some code to consume all wating items from a queue. Rather than processing the items 1 by 1, it makes sense to process all waiting items as a set.

I've declared my queue like this.

private BlockingCollection<Item> items = 
    new BlockingCollection<Item>(new ConcurrentQueue<Item>);

Then, on a consumer thread, I plan to read the items in batches like this,

Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
    var workToDo = new List<Item>();
    workToDo.Add(nextItem);

    while(this.items.TryTake(out nextItem))
    {
        workToDo.Add(nextItem);
    }

    // process workToDo, then go back to the queue.
}

This approach lacks the utility of GetConsumingEnumerable and I can't help wondering if I've missed a better way, or if my approach is flawed.

Is there a better way to consume a BlockingCollection<T> in batches?

2
You could do a Take(50) on the ConsumingEnumerable but you would lose the effect of the 50ms timeout. So choose what is more important.Henk Holterman
@HenkHolterman, you're right, I don't actually need that, it would be problematic it items were produced at a faster rate.Jodrell
@HenkHolterman, question edited accordingly.Jodrell
No a faster rate would be no problem, a slower one would, Your current code (w/o timeout) is less suitable to be replaced by ConsumingEnumerableHenk Holterman
That's what I meant with making a choice: min/max batch size and min/max waiting time.Henk Holterman

2 Answers

5
votes

A solution is to use the BufferBlock<T> from System.Threading.Tasks.Dataflow (which is included in .net core 3+). It does not use GetConsumingEnumerable(), but it still does allow you the same utility, mainly:

  • allows parallel processing w/ multiple (symmetrical and/or asymmetrical) consumers and producers
  • thread safe (allowing for the above) - no race conditions to worry about
  • can be cancelled by a cancellation token and/or collection completion
  • consumers block until data is available, avoiding wasting CPU cycles on polling

There is also a BatchBlock<T>, but that limits you to fixed sized batches.

var buffer = new BufferBlock<Item>();
while (await buffer.OutputAvailableAsync())
{
    if (buffer.TryReceiveAll(out var items))
        //process items
}

Here is a working example, which demos the following:

  • multiple symmetrical consumers which process variable length batches in parallel
  • multiple symmetrical producers (not truly operating in parallel in this example)
  • ability to complete the collection when the producers are done
  • to keep the example short, I did not demonstrate the use of a CancellationToken
  • ability to wait until the producers and/or consumers are done
  • ability to call from an area that doesn't allow async, such as a constructor
  • the Thread.Sleep() calls are not required, but help simulate some processing time that would occur in more taxing scenarios
  • both the Task.WaitAll() and the Thread.Sleep() can optionally be converted to their async equivalents
  • no need to use any external libraries
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static class Program
{
    static void Main()
    {
        var buffer = new BufferBlock<string>();

        // Kick off consumer task(s)
        List<Task> consumers = new List<Task>();
        for (int i = 0; i < 3; i++)
        {
            consumers.Add(Task.Factory.StartNew(async () =>
            {
                // need to copy this due to lambda variable capture
                var num = i; 
                while (await buffer.OutputAvailableAsync())
                {
                    if (buffer.TryReceiveAll(out var items))
                        Console.WriteLine($"Consumer {num}:    " + 
                            items.Aggregate((a, b) => a + ", " + b));

                        // real life processing would take some time
                        await Task.Delay(500); 
                }

                Console.WriteLine($"Consumer {num} complete");
            }));

            // give consumer tasks time to activate for a better demo
            Thread.Sleep(100); 
        }

        // Kick off producer task(s)
        List<Task> producers = new List<Task>();
        for (int i = 0; i < 3; i++)
        {
            producers.Add(Task.Factory.StartNew(() =>
            {
                for (int j = 0 + (1000 * i); j < 500 + (1000 * i); j++)
                    buffer.Post(j.ToString());
            }));

            // space out the producers for a better demo
            Thread.Sleep(10); 
        }

        // may also use the async equivalent
        Task.WaitAll(producers.ToArray());
        Console.WriteLine("Finished waiting on producers");

        // demo being able to complete the collection
        buffer.Complete(); 

        // may also use the async equivalent
        Task.WaitAll(consumers.ToArray()); 
        Console.WriteLine("Finished waiting on consumers");

        Console.ReadLine();
    }
}

Here is a mondernised and simplified version of the code.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    private static async Task Main()
    {
        var buffer = new BufferBlock<string>();

        // Kick off consumer task(s)
        var consumers = new List<Task>();
        for (var i = 0; i < 3; i++)
        {
            var id = i;
            consumers.Add(Task.Run(() => StartConsumer(id, buffer)));

            // give consumer tasks time to activate for a better demo
            await Task.Delay(100);
        }

        // Kick off producer task(s)
        var producers = new List<Task>();
        for (var i = 0; i < 3; i++)
        {
            var pid = i;
            producers.Add(Task.Run(() => StartProducer(pid, buffer)));

            // space out the producers for a better demo
            await Task.Delay(10);
        }

        // may also use the async equivalent
        await Task.WhenAll(producers);
        Console.WriteLine("Finished waiting on producers");

        // demo being able to complete the collection
        buffer.Complete();

        // may also use the async equivalent
        await Task.WhenAll(consumers);
        Console.WriteLine("Finished waiting on consumers");

        Console.ReadLine();
    }

    private static async Task StartConsumer(
            int id,
            IReceivableSourceBlock<string> buffer)
    {
        while (await buffer.OutputAvailableAsync())
        {
            if (buffer.TryReceiveAll(out var items))
            {
                Console.WriteLine($"Consumer {id}: " + 
                    items.Aggregate((a, b) => a + ", " + b));
            }

            // real life processing would take some time
            await Task.Delay(500);
        }

        Console.WriteLine($"Consumer {id} complete");
    }

    private static Task StartProducer(int pid, ITargetBlock<string> buffer)
    {
        for (var j = 0 + (1000 * pid); j < 500 + (1000 * pid); j++)
        {
            buffer.Post(j.ToString());
        }

        return Task.CompletedTask;
    }
}
2
votes

While not as good as ConcurrentQueue<T> in some ways, my own LLQueue<T> allows for a batched dequeue with a AtomicDequeueAll method where all items currently on the queue are taken from it in a single (atomic and thread-safe) operation, and are then in a non-threadsafe collection for consumption by a single thread. This method was designed precisely for the scenario where you want to batch the read operations.

This isn't blocking though, though it could be used to create a blocking collection easily enough:

public BlockingBatchedQueue<T>
{
  private readonly AutoResetEvent _are = new AutoResetEvent(false);
  private readonly LLQueue<T> _store;
  public void Add(T item)
  {
    _store.Enqueue(item);
    _are.Set();
  }
  public IEnumerable<T> Take()
  {
    _are.WaitOne();
    return _store.AtomicDequeueAll();
  }
  public bool TryTake(out IEnumerable<T> items, int millisecTimeout)
  {
    if(_are.WaitOne(millisecTimeout))
    {
      items = _store.AtomicDequeueAll();
      return true;
    }
    items = null;
    return false;
  }
}

That's a starting point that doesn't do the following:

  1. Deal with a pending waiting reader upon disposal.
  2. Worry about a potential race with multiple readers both being triggered by a write happening while one was reading (it just considers the occasional empty result enumerable to be okay).
  3. Place any upper-bound on writing.

All of which could be added too, but I wanted to keep to the minimum of some practical use, that hopefully isn't buggy within the defined limitations above.