I'm trying to wrap my head around BlockingCollection and my producer/consumer problem.
What I want to achieve, is the following:
- A thread-safe queue of sorts to hold a list of objects ("jobs") in FIFO manner.
- A second thread-safe queue, which holds a list of results of those jobs, also in FIFO manner.
In other words:
Inbound "Job" Data, can come at any time from multiple threads
==> Thread-Safe FIFO Queue 1 "FQ1"
==> Async Processing of data in FQ1 (and remove item from FQ1)
==> Callback/Results into Thread-Safe FIFO Queue 2 "FQ2"
==> Async Processing of data in FQ2 (and remove item from FQ2)
==> Done
My humble tries so far are:
private BlockingCollection<InboundObject> fq1;
private BlockingCollection<ResultObject> fq2;
(...)
Task.Factory.StartNew(() =>
{
foreach (InboundObject a in fq1.GetConsumingEnumerable())
a.DoWork(result => fq2.Add(result)); //a.DoWork spits out an Action<ResultObject>
}
One of the reasons I chose BlockingCollection is because I want to keep load to a minimum, meaning only do work (and not deal with wait/sleep) when items are actually inside of the collection. I'm not sure if foreach is the correct approach for that.
Please let me know if this is correct or if there is a better approach. Thanks!
Edit I could tell from unit testing that the work inside the task was actually synchronous. New version is as follows:
Task.Factory.StartNew(() =>
{
foreach (InboundObject a in fq1.GetConsumingEnumerable())
Task.Factory.StartNew(async () => { fq2.Add(await a.DoWork()); });
}
Input is very much appreciated!