2
votes

I'm trying to implement a concurrent producer-consumer collection (multiple producers and consumers) that supports timeouts for consumers.

Now the actual collection is pretty complicated (nothing in System.Collections.Concurrent that does the job unfortunately), but I have a minimal sample here that demonstrates my problem (looks a bit like BlockingCollection<T>).

public sealed class ProducerConsumerQueueDraft<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly object locker = new object();

    public void Enqueue(T item)
    {
        lock (locker)
        {
            queue.Enqueue(item);

            /* This "optimization" is broken, as Nicholas Butler points out.
            if(queue.Count == 1) // Optimization
            */
                Monitor.Pulse(locker); // Notify any waiting consumer threads.
        }
    }

    public T Dequeue(T item)
    {
        lock (locker)
        {
            // Surprisingly, this needs to be a *while* and not an *if*
            // which is the core of my problem.
            while (queue.Count == 0)
                Monitor.Wait(locker);

            return queue.Dequeue();
        }
    }

    // This isn't thread-safe, but is how I want TryDequeue to look.
    public bool TryDequeueDesired(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
            {
                item = default(T);
                return false;
            }

            // This is wrong! The queue may be empty even though we were pulsed!
            item = queue.Dequeue();
            return true;
        }
    }

    // Has nasty timing-gymnastics I want to avoid.
    public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            var watch = Stopwatch.StartNew();
            while (queue.Count == 0)
            {
                var remaining = timeout - watch.Elapsed;

                if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
                {
                    item = default(T);
                    return false;
                }
            }
            item = queue.Dequeue();
            return true;
        }
    }
}

The idea is straightforward: consumers who find an empty queue wait to be signaled, and producers Pulse (note: not PulseAll, which would be inefficient) them to notify them of a waiting item.

My problem is this property of Monitor.Pulse:

When the thread that invoked Pulse releases the lock, the next thread in the ready queue (which is not necessarily the thread that was pulsed) acquires the lock.

What this means is that consumer-thread C1 could be woken up by a producer-thread to consume an item, but another consumer-thread C2 could acquire the lock before C1 has a chance to reacquire it, and consume the item, leaving C1 with an empty queue when it is given control.

This means I have to defensively check in the consumer code on every pulse if the queue is indeed non-empty, and go back and wait empty-handed if this not the case.

My primary issue with this is that it inefficient - threads may be woken up to do work and then promptly sent back to wait again. A related consequence of this is that implementing a TryDequeue with a timeout is unnecessarily difficult and inefficient (see TryDequeueThatWorks) when it should be elegant (see TryDequeueDesired).

How can I twist Monitor.Pulse to do what I want? Alternatively, is there another synchronization primitive that does? Is there a more efficient and/or elegant way to implement a TryDequeue timeout than what I have done?

FYI, here's a test that demonstrates the issues with my desired solution:

var queue = new ProducerConsumerQueueDraft<int>();

for (int consumer = 0; consumer < 3; consumer++)
    new Thread(() =>
    {
        while (true)
        {
            int item;

            // This call should occasionally throw an exception.
            // Switching to queue.TryDequeueThatWorks should make
            // the problem go away.
            if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
            {
                // Do nothing.
            }
        }

    }).Start();

Thread.Sleep(1000); // Let consumers get up and running

for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
    queue.Enqueue(0);
}
3
@dtb: I know of the method. What's your point? You want me to look at the implementation? It appears to be using SemaphoreSlim, but I don't know how exactly it (and all of BlockingCollection) works.Ani
From your requirements I don't understand why that method doesn't work for you.dtb
@dtb: I'm not using BlockingCollection. I'm implementing a producer-consumer collection with some complex characteristics (FYI, it's a key-based conflated queue). This is just a minimal sample that demonstrates my issue - it's not my actual code.Ani
Ah, I see. Are you sure BlockingCollection can't be twisted in some way to make it work you? For example, by combining it with a ConcurrentDictionary for the conflation part?.. Monitor.Pulse/Wait is a very fragile synchronization primitive; I recommend you use a different mechanism like ManualResetEventSlim, SemaphoreSlim, etc. But it's quite difficult to give a recommendation, because I wouldn't recommend building the message queue from scratch but from existing parts.dtb

3 Answers

2
votes

My primary issue with this is that it inefficient

It is not. You assume that this is a common occurrence but this kind of race happens very rarely. Once in a Blue Moon, at best. The while loop is necessary to ensure nothing goes wrong when it does occur. And it will. Don't mess with it.

It is in fact the opposite, the lock design is efficient because it does allow a race to occur. And deals with it. Tinkering with locking designs is so very dangerous because the races don't happen frequently enough. They are horribly random which prevents sufficient testing to prove that the alterations don't cause failure. Adding any instrumenting code doesn't work either, it alters the timing.

1
votes

I wrote an article about this that may help:

Thread synchronization: Wait and Pulse demystified

In particular, it explains why a while loop is necessary.

1
votes

Here's a simple key-based conflating producer-consumer queue:

public class ConflatingConcurrentQueue<TKey, TValue>
{
    private readonly ConcurrentDictionary<TKey, Entry> entries;
    private readonly BlockingCollection<Entry> queue;

    public ConflatingConcurrentQueue()
    {
        this.entries = new ConcurrentDictionary<TKey, Entry>();
        this.queue = new BlockingCollection<Entry>();
    }

    public void Enqueue(TValue value, Func<TValue, TKey> keySelector)
    {
        // Get the entry for the key. Create a new one if necessary.
        Entry entry = entries.GetOrAdd(keySelector(value), k => new Entry());

        // Get exclusive access to the entry.
        lock (entry)
        {
            // Replace any old value with the new one.
            entry.Value = value;

            // Add the entry to the queue if it's not enqueued yet.
            if (!entry.Enqueued)
            {
                entry.Enqueued = true;
                queue.Add(entry);
            }
        }
    }

    public bool TryDequeue(out TValue value, TimeSpan timeout)
    {
        Entry entry;

        // Try to dequeue an entry (with timeout).
        if (!queue.TryTake(out entry, timeout))
        {
            value = default(TValue);
            return false;
        }

        // Get exclusive access to the entry.
        lock (entry)
        {
            // Return the value.
            value = entry.Value;

            // Mark the entry as dequeued.
            entry.Enqueued = false;
            entry.Value = default(TValue);
        }

        return true;
    }

    private class Entry
    {
        public TValue Value { get; set; }
        public bool Enqueued { get; set; }
    }
}

(This may need a code review or two, but I think in general it's sane.)