I would do it like the following class. You call Enqueue()
when you produce an item to add it to one of the queues. This method always returns (almost) immediately. In another thread, you call Dequeue()
when you are ready to consume an item. It tries to first take from high priority queue. If no items are available at the moment in any of the queues, the call blocks. When you finished producing, you call Complete()
. After that call has been made and both queues are empty, the next call (or the current blocked call) to Dequeue()
throws InvalidOperationException
.
If your producer(s) can be faster than your consumer(s) for long periods of time, you should bound the queues (new BlockingCollection<T>(capacity)
). But in this case, if you have only one thread that produces both low and high priority items, it's possible that high priority items would have to wait for low priority items. You could fix this by having one thread for producing high priority items and one for low priority items. Or you could bound only high priority queue and hope that you won't get one million low priority items at once.
class Worker<T>
{
BlockingCollection<T> m_highPriorityQueue = new BlockingCollection<T>();
BlockingCollection<T> m_lowPriorityQueue = new BlockingCollection<T>();
public void Enqueue(T item, bool highPriority)
{
BlockingCollection<T> queue;
if (highPriority)
queue = m_highPriorityQueue;
else
queue = m_lowPriorityQueue;
queue.Add(item);
}
public T Dequeue()
{
T result;
if (!m_highPriorityQueue.IsCompleted)
{
if (m_highPriorityQueue.TryTake(out result))
return result;
}
if (!m_lowPriorityQueue.IsCompleted)
{
if (m_lowPriorityQueue.TryTake(out result))
return result;
}
if (m_highPriorityQueue.IsCompleted && m_lowPriorityQueue.IsCompleted)
throw new InvalidOperationException("All work is done.");
else
{
try
{
BlockingCollection<T>.TakeFromAny(
new[] { m_highPriorityQueue, m_lowPriorityQueue },
out result);
}
catch (ArgumentException ex)
{
throw new InvalidOperationException("All work is done.", ex);
}
return result;
}
}
public void Complete()
{
m_highPriorityQueue.CompleteAdding();
m_lowPriorityQueue.CompleteAdding();
}
}