9
votes

I was thinking, is it possible to have a lockless queue when more than one thread is reading or writing? I've seen an implementation with a lockless queue that worked with one read and one write thread but never more than one for either. Is it possible? I don't think it is. Can/does anyone want to prove it?

6

6 Answers

14
votes

There are multiple algorithms available, I ended up implementing the An Optimistic Approach to Lock-Free FIFO Queues, which avoids the ABA problem via pointer-tagging (needs the CMPXCHG8B instruction on x86), and it runs fine in a production app (written in Delphi). (Another version, with Java code)

Nevertheless, to be really-really lockless, you would also need a lock-free memory allocator - see Scalable Lock-Free Dynamic Memory Allocation (implemented in Concurrent Building Block) or NBMalloc (but so far, I didn't get to use one of these).

You may also want to look at answers for optimistic lock-free FIFO queues impl?

3
votes

Java's implementation of a Lockless Queue allows both reads and writes. This work is done with a compare and set operation (which is a single CPU instruction).

The ConcurrentLinkedQueue uses a method in which threads help each other read (or poll) objects from the queue. Since it is linked, the head of the queue can accept writes while the tail of the queue can accept reads (assuming enough room). All of this can be done in parallel and is completely thread safe.

1
votes

With .NET 4.0, there is ConcurrentQueue(T) Class.
According to C# 4.0 in a nutshell, this is a lock free implementation. See also this blog entry.

0
votes

You don't specifically need a lock, but an atomic way of deleting things from the queue. This is also possible without a lock and with an atomic test-and-set instruction.

0
votes

There is a dynamic lock free queue in the OmniThreadLibrary by Primoz Gabrijelcic (the Delphi Geek): http://www.thedelphigeek.com/2010/02/omnithreadlibrary-105.html

0
votes

With .NET 4.0, there is ConcurrentQueue Class.

Sample

https://dotnetfiddle.net/ehLZCm

public static void Main()
{
    PopulateQueueParallel(new ConcurrentQueue<string>(), 500);
}

static void PopulateQueueParallel(ConcurrentQueue<string> queue, int queueSize)
{
    Parallel.For(0, queueSize, (i) => queue.Enqueue(string.Format("my message {0}", i)));
    Parallel.For(0, queueSize,
        (i) =>
        {
            string message;
            bool success = queue.TryDequeue(out message);
            if (!success)
                throw new Exception("Error!");

            Console.WriteLine(message);
        });
}