1
votes

I'm writing a relatively simple "proxy" app for a web service. The general idea is that a TCP server (w/asynchronous connections) will read (string) data from clients and put that data (as part of the read callback function) into one of two queues (Q1 and Q2). Another thread will read the data in these queues and pass it to the web service. Data in Q1 should take priority over any data that might be in Q2.

I've been reading about the producer/consumer pattern and it seems like that's pretty much what I'm trying to implement with regard to the queues. Since my enqueue and dequeue operations will be happening on different threads, it seems obvious that my queues will need to be thread safe and support some kind of locking mechanism? This is a .NET 4.0 application and I saw the docs on the new BlockingCollection and ConcurrentQueue classes but I'm not sure exactly what the difference is or how I'd implement them in this scenario. Can anyone shed some more light on that? Thank you!

2

2 Answers

3
votes

I would do it like the following class. You call Enqueue() when you produce an item to add it to one of the queues. This method always returns (almost) immediately. In another thread, you call Dequeue() when you are ready to consume an item. It tries to first take from high priority queue. If no items are available at the moment in any of the queues, the call blocks. When you finished producing, you call Complete(). After that call has been made and both queues are empty, the next call (or the current blocked call) to Dequeue() throws InvalidOperationException.

If your producer(s) can be faster than your consumer(s) for long periods of time, you should bound the queues (new BlockingCollection<T>(capacity)). But in this case, if you have only one thread that produces both low and high priority items, it's possible that high priority items would have to wait for low priority items. You could fix this by having one thread for producing high priority items and one for low priority items. Or you could bound only high priority queue and hope that you won't get one million low priority items at once.

class Worker<T>
{
    BlockingCollection<T> m_highPriorityQueue = new BlockingCollection<T>();
    BlockingCollection<T> m_lowPriorityQueue = new BlockingCollection<T>();

    public void Enqueue(T item, bool highPriority)
    {
        BlockingCollection<T> queue;
        if (highPriority)
            queue = m_highPriorityQueue;
        else
            queue = m_lowPriorityQueue;

        queue.Add(item);
    }

    public T Dequeue()
    {
        T result;

        if (!m_highPriorityQueue.IsCompleted)
        {
            if (m_highPriorityQueue.TryTake(out result))
                return result;
        }

        if (!m_lowPriorityQueue.IsCompleted)
        {
            if (m_lowPriorityQueue.TryTake(out result))
                return result;
        }

        if (m_highPriorityQueue.IsCompleted && m_lowPriorityQueue.IsCompleted)
            throw new InvalidOperationException("All work is done.");
        else
        {
            try
            {
                BlockingCollection<T>.TakeFromAny(
                    new[] { m_highPriorityQueue, m_lowPriorityQueue },
                    out result);
            }
            catch (ArgumentException ex)
            {
                throw new InvalidOperationException("All work is done.", ex);
            }

            return result;
        }
    }

    public void Complete()
    {
        m_highPriorityQueue.CompleteAdding();
        m_lowPriorityQueue.CompleteAdding();
    }
}
1
votes

BlockingCollection uses ConcurrentQueue by default. Should be a good fit for your application. It might be easier if you used F# with a Mailbox and async blocks. I made a sample post of a common implementation earlier.

Map/reduce with F# Agent or MailboxProcessor