2
votes

I've read a few other similiar-but-not-the-same links trying to find some answers: How to consume a BlockingCollection<T> in batches

However, (in the above link) not using GetConsumingEnumerable seems fishy.

What is the correct method to effectively block producers while the consumer (should be singular) empties the collection?

[We want to do batch-processing because each batch does a web service call which would be a bottle neck if every single message/item needed its own call. Batching the messages/items is the solution for this bottleneck.]

Ideally:

1) Receive message

2) New producer task to push into collection

3) When collection 'full' (arbitrary limit), block all producer(s), new consumer task to consume ALL of the collection, then unblock for producer(s).

In other words; I want (parallel producers) xor (single consumer) acting on the collection at any time.

Seems like this should have been done before, but I can't seem to find a code snippet that specifically acts this way.

Thanks for any help.

2
It sounds as if you want to run producers and consumer in serial and not parallel. If you don't mind if producers are running while consumer is processing, then double-buffer would be better than single collection.Euphoric
After the queue becomes full, you normally wouldn't wait for the queue to be completely empty before allowing the producers to start populating it again - why do you want to do that?Matthew Watson
"Normally wouldn't wait" = exactly why I posted this. I have a requirement (to avoid data loss if the program dies) to process batches of N messages or every M milliseconds. If messages are coming in very quickly and producing into the same BlockingCollection, we check each time if the collection has N items in it. Once this is true, we spin off (what should be a single) consumer task to batch process them all. However, if the consumer grabs ONE item via foreach loop, then another message/producer comes in, it will hit the limit and spawn another consumer task. Trying to avoid this.Guy

2 Answers

0
votes

Using this model all of the work is entirely serialized, which is to say you never have more than one "thing" working at a time. Either the producer is working, or the consumer is. Because of this, you don't really need a collection that is manipulated from both a producer and consumer, instead you can have a producer that produces batches of a traditional collection that the consumer consumes when it's done. It could look something like this:

public Task<List<Thing>> Produce(Message message)
{
    //...
}

public Task Consume(List<Thing> data)
{
    //...
}

public async Task MessageReceived(Message message)
{
    while(HaveMoreBatches(message))
    {
        await Consume(await Produce(message));
    }
}

This lets you produce a batch, then consume it, then produce another batch, then consume it, etc. until there are no more batches to produce.

0
votes

According to your vague description, I believe double-buffer is what you want.

Simply create two buffers. Producers write into one until. When it gets full or when timer ticks out, it gets "swapped" for the second one and producers start writing into the new one. The consumer then starts reading the first, now full buffer.

This allows both producers and consumer to run at the same time. And makes sure consumer handles all previously create work in a batch before repeating the loop again.