6
votes

I'm working on project with the following workflow:

Part One:

  • Event arrives asynchronously and is queued in blocking queue, we'll call that Q1
  • Thread picks up next available item from that queue
  • Item ends up running {N} number of tasks in parallel
  • Each task queues its result on a second queue, we'll call that Q2.
  • When processing of item finishes, the next item is read off the queue.

Part Two:

  • Another thread reads off of Q2 one object at a time and works on the result

So, the problem here is, every item on the first queue ends up running a large number of tasks in parallel, and each task queues its result. The second queue must be processed serially, one item at a time though, and it's getting flooded.


My Question

I need a mechanism that will make the thread processing Q1 wait until the number of items in Q2 is below a specific threshhold. What's the best way to achieve this? Is there any way to have an event driven solution rather than a polling solution?

3

3 Answers

8
votes

Instead of using a Queue<T>, you can use a BlockingCollection<T> for Q2. If you set its BoundedCapacity, calls to Q2.Add() will block when the capacity is reached. This will automatically throttle the processing of Q1, as the N tasks will begin blocking if they can't add to the final queue.

2
votes

I am assuming that you receive data in occasional floods, with long droughts during which Q2 can catch up. Have you considered simply limiting the number of concurrent threads spawned from Q1 by using a limited thread pool for these tasks?

I suspect you could benefit from multiple thread pools, if the job size is easily determined on arrival. You could have a small number of threads to process large jobs, and a large number of threads ready to process small jobs. Even a 3rd intermediate queue might be beneficial.

1
votes

Your problem seems like a perfect example to be solved by TPL Dataflow library. If you are willing to try it, here's how it could work (this is very simple example of course):

TransformBlock<int, bool> transform = new TransformBlock<int, bool>(i => i > 5 ? true : false,
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
ActionBlock<bool> resultBlock = new ActionBlock<bool>(b => Console.WriteLine("My result is : " + b),
            new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });
transform.LinkTo(resultBlock);

You are defining a transform block which will make your transformation (this works as your Q1), you can set it's parallelism level to number of tasks you want to be used.

Then, you are creating second block (working as your Q2), which will have BoundedCapacity set and it's processing every message synchronously, invoking an action for each element. This block could be substituted by any other, like BufferBlock which would allow you to poll from it on demand.