0
votes

I have a BlockingCollection which I write to from one thread and I read from another. The producer thread takes items received from a server and adds them to the BlockingCollection, while the reading thread attempts to empty the BlockingCollection and process them.

The problem I am trying to empty the queue in batches, because processing them one by one will be too slow. But when it's being constantly written to (thousands of items), then the consumer thread keeps reading them until it's emptied, which means that the processing will not even start until the writing is done.

Now, the processing in the consumer can be done in parallel, so I have been wondering how to go about that.

Currently I have 2 ideas:

  1. After a certain number of items are read from the BlockingCollection in the consumer, start a new parallel job that processes them, instead of waiting to completely empty the queue and THEN start processing.

  2. Use multiple consumers and hope that they will run in parallel instead of just constantly blocking each other while trying to read the BlockingCollection at the same time.

So my question is about option number 2 - is the BlockingCollection internally optimized for such a case? Will it partition the areas that are read from, or will the consumers fight over each item? If that's the case, then option 1 is superior?

2
Fildor is suggesting option 1, surely?Matthew Watson
But when it's being constantly written to (thousands of items), then the consumer thread keeps reading them until it's emptied, which means that the processing will not even start until the writing is done. I have read that sentence multiple times and can't work out what it means.mjwills
@Fildor Ah, in that case the OP should "stop doing that". :)mjwills
Use multiple consumers and hope that they will run in parallel instead of just constantly blocking each other while trying to read the BlockingCollection at the same time. That is literally its job, so I presume it will be OK. But profile to be sure. If it is a problem, have a single consumer that batches entries into List<entry> and then adds them to a second BlockingCollection (that is consumed from by multiple readers) to reduce the contention. But honestly it is unlikely to be needed.mjwills
Before you jump to more custom made solutions, why don't you try the easy fix to your current solution that has now been suggested by 2 users? And if that doesn't please you, why don't you take the opposite path and try the even higher abstraction that has been created by very smart people with dozens of testers and that is out in the field for several years now?Fildor

2 Answers

2
votes

To add just another alternative: (in no way production-ready!)

This makes use of TPL's Dataflow, where BatchBlock<T> abstracts the batching away for us.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public class HoneyBatcher
{
    private const int BATCHSIZE = 10; // Find the size that works best for you.
    private readonly BatchBlock<Honey> batchBlock;
    private readonly ExecutionDataflowBlockOptions _options = 
                     new ExecutionDataflowBlockOptions()
    {
         // I'd start with 1, then benchmark if higher number actually benefits.
         MaxDegreeOfParallelism = 1, 
         SingleProducerConstrained = true // if so, may micro-optimize throughput
    };
                       // vv Whatever process you want done on a batch
    public HoneyBatcher( Action<Honey[]> batchProcessor )
    {
        // BatchBlock does the batching
        // and is the entrypoint to the pipline.
        batchBlock = new BatchBlock<Honey>(BATCHSIZE);
        // processorBlock processes each batch that batchBlock will produce
        // Parallel executions as well as other tweaks can be configured through options.
        ActionBlock<Honey[]> processorBlock = 
                             new ActionBlock<Honey[]>(batchProcessor, _options);
        // build the pipline
        batchBlock.LinkTo(processorBlock);
        // item => batchBlock => item[BATCHSIZE] => batchProcessor(item[])
    }

    // Add item individually and have them batched up
    // and processed in a pipeline.
    public Task<bool> ProcessAsync(Honey item)
    {
        return batchBlock.SendAsync(item);
        // Can also be done with sync API.
    }
}

public class Honey 
{
    // Just a dummy
}

Be advised that above snippet is just a rough layout of the idea. In production, you would of course address error handling, completion, etc.

0
votes

A natural way to process the items in batches would be to insert them in the BlockingCollection in batches, instead of trying to remove them later in batches. In other words you could use a BlockingCollection<T[]> instead of BlockingCollection<T>. The producer thread could do the batching easily by using a Queue<T>:

var queue = new Queue<T>;
while (someCondition)
{
    var item = ProduceItem();
    queue.Enqueue(item);
    if (queue.Count == batchSize)
    {
        blockingCollection.Add(queue.ToArray());
        queue.Clear();
    }
}
if (queue.Count > 0)
{
    blockingCollection.Add(queue.ToArray());
    queue.Clear();
}
blockingCollection.CompleteAdding();

Depending on the situation you could also use some LINQ-style operator like the Batch from the MoreLinq library.

Finally, to answer your main question, yes, the BlockingCollection class handles excellently multiple consumers as well as multiple producers. In case the collection is empty all consumers are blocked, and when an item arrives it is given to one of the waiting consumers.