0
votes

I have to write a communication between multiple threads (at least 3 for now) in .Net3.5 and each of them is both producer and consumer. Instead of sending signals between each pair of threads my idea is to implement a queue of messages which will store class like this:

enum Signals { ObjectArrivedOnLightBarrier, ObjectLeftLightBarrier, CodeFound };
enum UnitID { GrabThread, ImageProcessingThread, SaveThread };

// Consumer shows who the message is intended for (and only that unit is allowed to remove it from the queue)
public class QueuedSignal
{
    Signals Message;
    UnitID Producer;
    UnitID Consumer;
}

The idea is that any thread can peek at the first item in the queue and leave it alone if the message is not intended for it (it is not an issue if there are few other messages and one of them might be intended for this thread).

Is queue thread-safe when there are multiple producers and consumers.

2

2 Answers

2
votes

Queue<T> is not threadsafe.

If you were using .Net 4 or later, I would recommend that you use BlockingCollection<T>.

Unfortunately, you can't use that, but there are several implementations of a Concurrent Queue kicking around.

Have a look at this one in the answer from Marc Gravel. Unfortunately that doesn't have a Peek() method.

Here's a class I used to use before .Net 4 came out; perhaps this might be of interest to you. It's not the greatest implementation; we were using it more as a placeholder until .Net 4 came out. Even so, here it is:

/// <summary>A bounded blocking queue.</summary>
/// <typeparam name="T">The element type of the queue, which must be a reference type.</typeparam>

public sealed class BoundedBlockingQueue<T>: IDisposable where T: class
{
    #region Construction and disposal

    /// <summary>Constructor.</summary>
    /// <param name="maxQueueSize">
    /// The maximum size of the queue.
    /// Calls to <see cref="Enqueue"/> when the queue is full will block until at least one item has been removed.
    /// Calls to <see cref="Dequeue"/> when the queue is empty will block until a new item is enqueued.
    /// </param>

    public BoundedBlockingQueue(int maxQueueSize)
    {
        if (maxQueueSize <= 0)
        {
            throw new ArgumentOutOfRangeException("maxQueueSize");
        }

        _queue                  = new Queue<T>(maxQueueSize);
        _itemsAvailable         = new Semaphore(0, maxQueueSize);
        _spaceAvailable         = new Semaphore(maxQueueSize, maxQueueSize);
        _queueStopped           = new ManualResetEvent(false);
        _queueStoppedAndEmpty   = new ManualResetEvent(false);
        _stoppedOrItemAvailable = new WaitHandle[] { _queueStopped, _itemsAvailable };
    }

    /// <summary>Disposal.</summary>

    public void Dispose()
    {
        if (_itemsAvailable != null)
        {
            _itemsAvailable.Close();
            _spaceAvailable.Close();
            _queueStopped.Close();
            _queueStoppedAndEmpty.Close();
            _itemsAvailable = null;          // Use _itemsAvailable as a flag to indicate that Dispose() has been called.
        }
    }

    #endregion Construction and disposal

    #region Public properties

    /// <summary>The number of items currently in the queue.</summary>

    public int Count
    {
        get
        {
            throwIfDisposed();

            lock (_queue)
            {
                return _queue.Count;
            }
        }
    }

    /// <summary>Has <see cref="Stop"/> been called?</summary>

    public bool Stopped
    {
        get
        {
            throwIfDisposed();
            return _stopped;
        }
    }

    #endregion Public properties

    #region Public methods

    /// <summary>
    /// Signals that new items will no longer be placed into the queue.
    /// After this is called, calls to <see cref="Dequeue"/> will return null immediately if the queue is empty.
    /// Before this is called, calls to <see cref="Dequeue"/> will block if the queue is empty.
    /// Attempting to enqueue items after this has been called will cause an exception to be thrown.
    /// </summary>
    /// <remarks>
    /// If you use a different thread to enqueue items than the thread that calls Stop() you might get a race condition.
    /// 
    /// If the queue is full and a thread calls Enqueue(), that thread will block until space becomes available in the queue.
    /// If a different thread then calls Stop() while the other thread is blocked in Enqueue(), the item enqueued by the other
    /// thread may become lost since it will be enqueued after the special null value used to indiciate the end of the
    /// stream is enqueued.
    /// 
    /// To prevent this from happening, you must enqueue from the same thread that calls Stop(), or provide another
    /// synchronisation mechanism to avoid this race condition.
    /// </remarks>

    public void Stop()
    {
        throwIfDisposed();

        lock (_queue)
        {
            _queueStopped.Set();
            _stopped = true;
        }
    }

    /// <summary>
    /// Returns the front item of the queue without removing it, or null if the queue is currently empty.
    /// A null return does NOT indicate that <see cref="Stop"/> has been called.
    /// This never blocks.
    /// </summary>
    /// <returns>The front item of the queue, or null if the queue is empty.</returns>

    public T Peek()
    {
        throwIfDisposed();
        T result;

        lock (_queue)
        {
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
            }
            else
            {
                result = null;
            }
        }

        return result;
    }

    /// <summary>
    /// Enqueues a new non-null item.
    /// If there is no room in the queue, this will block until there is room.
    /// An exception will be thrown if <see cref="Stop"/> has been called.
    /// </summary>
    /// <param name="item">The item to be enqueued. This may not be null.</param>

    public void Enqueue(T item)
    {
        throwIfDisposed();

        if (item == null)
        {
            throw new ArgumentNullException("item");
        }

        if (_stopped)
        {
            throw new InvalidOperationException("Attempting to enqueue an item to a stopped queue.");
        }

        this.enqueue(item);
    }

    /// <summary>
    /// Dequeues the next available item.
    /// If <see cref="Stop"/> has been called, this returns null if the queue is empty.
    /// Otherwise it blocks until an item becomes available (or <see cref="Stop"/> is called).
    /// </summary>
    /// <returns>The next available item, or null if the queue is empty and stopped.</returns>

    public T Dequeue()
    {
        throwIfDisposed();

        if (_isQueueStoppedAndEmpty)
        {
            return null;
        }

        WaitHandle.WaitAny(_stoppedOrItemAvailable);

        lock (_queue)
        {
            if (_stopped && (_queue.Count == 0))
            {
                _isQueueStoppedAndEmpty = true;
                _queueStoppedAndEmpty.Set();
                return null;
            }
            else
            {
                T item = _queue.Dequeue();
                _spaceAvailable.Release();
                return item;
            }
        }
    }

    /// <summary>Waits forever for the queue to become empty AND stopped.</summary>

    public void WaitUntilStoppedAndEmpty()
    {
        throwIfDisposed();
        WaitUntilStoppedAndEmpty(Timeout.Infinite);
    }

    /// <summary>Waits up to the specified time for the queue to become empty AND stopped.</summary>
    /// <param name="timeoutMilliseconds">The maximum wait time, in milliseconds.</param>
    /// <returns>True if the wait succeeded, false if it timed-out.</returns>

    public bool WaitUntilStoppedAndEmpty(int timeoutMilliseconds)
    {
        throwIfDisposed();
        return _queueStoppedAndEmpty.WaitOne(timeoutMilliseconds);
    }

    #endregion Public methods

    #region Private methods

    /// <summary>Enqueues a new item (which may be null to indicate the last item to go into the queue).</summary>
    /// <param name="item">An item to enqueue.</param>

    private void enqueue(T item)
    {
        _spaceAvailable.WaitOne();

        lock (_queue)
        {
            _queue.Enqueue(item);
        }

        _itemsAvailable.Release();
    }

    /// <summary>Throws if this object has been disposed.</summary>

    private void throwIfDisposed()
    {
        if (_itemsAvailable == null)
        {
            throw new ObjectDisposedException(this.GetType().FullName);
        }
    }

    #endregion Private methods

    #region Fields

    /// <summary>
    /// Contains wait handles for "stopped" and "item available".
    /// Therefore using this for WaitAny() will wait until the queue is stopped
    /// or an item becomes available, whichever is the sooner.
    /// </summary>

    private readonly WaitHandle[] _stoppedOrItemAvailable;

    private Semaphore _itemsAvailable;

    private volatile bool _stopped;
    private volatile bool _isQueueStoppedAndEmpty;

    private readonly Queue<T> _queue;
    private readonly Semaphore _spaceAvailable;
    private readonly ManualResetEvent _queueStopped;
    private readonly ManualResetEvent _queueStoppedAndEmpty;

    #endregion Fields
}

And here's an old unit test for it. It's not a very good unit test; it tests too many things at once and has some other problems, but it will demonstrate how to use the queue:

[TestMethod]

public void TestBoundedBlockingQueue()
{
    int maxQueueSize = 8;

    using (var queue = new BoundedBlockingQueue<string>(maxQueueSize))
    {
        // Fill the queue, but don't block.

        for (int i = 0; i < maxQueueSize; ++i)
        {
            int start1 = DateTimeFunctions.TickCount;
            queue.Enqueue(i.ToString());
            int elapsed1 = DateTimeFunctions.TickCount - start1;
            Assert.IsTrue(elapsed1 < 100, "Took too long to enqueue an item.");  // Shouldn't have taken more than 100 ms to enqueue the item.
        }

        // Now if we try to enqueue something we should block (since the queue should be full).
        // We can detect this by starting a thread that will dequeue something in a few seconds
        // and then seeing how long the main thread took to enqueue something.
        // It should have taken around the thread sleep time (+/- half a second or so).

        int sleepTime = 2500;
        int tolerance = 500;
        Worker.Run(()=>{Thread.Sleep(sleepTime); queue.Dequeue();}, "TestBoundedBlockingQueue Dequeue()");
        int start2 = DateTimeFunctions.TickCount;
        queue.Enqueue(maxQueueSize.ToString());
        int elapsed2 = DateTimeFunctions.TickCount - start2;
        Assert.IsTrue(Math.Abs(elapsed2 - sleepTime) <= tolerance, "Didn't take the right time to enqueue an item.");

        // Now verify that the remaining items in the queue are the expected ones,
        // i.e. from "1".."maxQueueSize" (since the first item, "0", has been dequeued).

        for (int i = 1; i <= maxQueueSize; ++i)
        {
            Assert.AreEqual(i.ToString(), queue.Dequeue(), "Incorrect item dequeued.");
        }

        Assert.AreEqual(0, queue.Count);

        // Now if we try to dequeue something we should block (since the queue is empty).
        // We can detect this by starting a thread that will enqueue something in 5 seconds
        // and then seeing how long the main thread took to dequeue something.
        // It should have taken around 5 seconds (+/- half a second or so).

        string testValue = "TEST";
        Worker.Run(()=>{Thread.Sleep(sleepTime); queue.Enqueue(testValue);}, "TestBoundedBlockingQueue Enqueue()");
        start2 = DateTimeFunctions.TickCount;
        Assert.AreEqual(testValue, queue.Dequeue(), "Incorrect item dequeued");
        elapsed2 = DateTimeFunctions.TickCount - start2;
        Assert.IsTrue(Math.Abs(elapsed2 - sleepTime) <= tolerance, "Didn't take the right time to enqueue an item.");
    }
}
1
votes

The June 2008 CTP of the Parallel Extensions for .NET included a BlockingCollection<T> class, which would do what you want. Although it might not have had a Peek method. That library worked with .NET 3.5. I used it quite a lot.

I've been unable to find a place to download it, but you might do a little searching.

It might be available in the Reactive Extensions. The newer version of Rx is for .NET 4.5, but there's an older version available at http://www.microsoft.com/en-us/download/details.aspx?id=28568