I was thinking, is it possible to have a lockless queue when more than one thread is reading or writing? I've seen an implementation with a lockless queue that worked with one read and one write thread but never more than one for either. Is it possible? I don't think it is. Can/does anyone want to prove it?
6 Answers
There are multiple algorithms available, I ended up implementing the An Optimistic Approach to Lock-Free FIFO Queues, which avoids the ABA problem via pointer-tagging (needs the CMPXCHG8B
instruction on x86
), and it runs fine in a production app (written in Delphi). (Another version, with Java code)
Nevertheless, to be really-really lockless, you would also need a lock-free memory allocator - see Scalable Lock-Free Dynamic Memory Allocation (implemented in Concurrent Building Block) or NBMalloc (but so far, I didn't get to use one of these).
You may also want to look at answers for optimistic lock-free FIFO queues impl?
Java's implementation of a Lockless Queue allows both reads and writes. This work is done with a compare and set operation (which is a single CPU instruction).
The ConcurrentLinkedQueue
uses a method in which threads help each other read (or poll) objects from the queue. Since it is linked, the head of the queue can accept writes while the tail of the queue can accept reads (assuming enough room). All of this can be done in parallel and is completely thread safe.
With .NET 4.0, there is ConcurrentQueue(T) Class.
According to C# 4.0 in a nutshell, this is a lock free implementation. See also this blog entry.
You don't specifically need a lock, but an atomic way of deleting things from the queue. This is also possible without a lock and with an atomic test-and-set instruction.
There is a dynamic lock free queue in the OmniThreadLibrary by Primoz Gabrijelcic (the Delphi Geek): http://www.thedelphigeek.com/2010/02/omnithreadlibrary-105.html
With .NET 4.0, there is ConcurrentQueue Class.
Sample
https://dotnetfiddle.net/ehLZCm
public static void Main()
{
PopulateQueueParallel(new ConcurrentQueue<string>(), 500);
}
static void PopulateQueueParallel(ConcurrentQueue<string> queue, int queueSize)
{
Parallel.For(0, queueSize, (i) => queue.Enqueue(string.Format("my message {0}", i)));
Parallel.For(0, queueSize,
(i) =>
{
string message;
bool success = queue.TryDequeue(out message);
if (!success)
throw new Exception("Error!");
Console.WriteLine(message);
});
}