76
votes

I'm in the process of designing a system which connects to one or more stream of data feeds and do some analysis on the data than trigger events based on the result. In a typical multi-threaded producer/consumer setup, I will have multiple producer threads putting data into a queue, and multiple consumer threads reading the data, and the consumers are only interested in the latest data point plus n number of points. The producer threads will have to block if slow consumer can not keep up, and of course consumer threads will block when there are no unprocessed updates. Using a typical concurrent queue with reader/writer lock will work nicely but the rate of data coming in could be huge, so i wanted to reduce my locking overhead especially writer locks for the producers. I think a circular lock-free buffer is what I needed.

Now two questions:

  1. Is circular lock-free buffer the answer?

  2. If so, before i roll my own, do you know any public implementation that will fit my need?

Any pointers in implementing a circular lock-free buffer are always welcome.

BTW, doing this in C++ on Linux.

Some additional info:

The response time is critical for my system. Ideally the consumer threads will want to see any updates coming in as soon as possible because an extra 1 millisecond delay could make the system worthless, or worth a lot less.

The design idea I'm leaning toward is a semi-lock-free circular buffer where the producer thread put data in the buffer as fast as it can, let's call the head of the buffer A, without blocking unless the buffer is full, when A meets the end of buffer Z. Consumer threads will each hold two pointers to the circular buffer, P and Pn, where P is the thread's local buffer head, and Pn is nth item after P. Each consumer thread will advance its P and Pn once it finish processing current P and the end of buffer pointer Z is advanced with the slowest Pn. When P catch up to A, which means no more new update to process, the consumer spins and do busy wait for A to advance again. If consumer thread spin for too long, it can be put to sleep and wait for a condition variable, but i'm okay with consumer taking up CPU cycle waiting for update because that does not increase my latency (I'll have more CPU cores than threads). Imagine you have a circular track, and the producer is running in front of a bunch of consumers, the key is to tune the system so that the producer is usually runing just a few step ahead of the consumers, and most of these operation can be done using lock-free techniques. I understand getting the details of the implementation right is not easy...okay, very hard, that's why I want to learn from others' mistakes before making a few of my own.

18
I think it would be helpful if you sketch the API that you want this data structure to implement.Dave
Something I learned is take big chunks of work. I don't know the size of your work items, but you can increase efficiency if you can produce bigger chunks and consume bigger chunks. You can also increase it by consuming variable size chunks so the consumers don't all finish at once and contend on the data queue.Zan Lynx
Another thing to think of is if you need one buffer or a series of buffers. You could have producer/consumer pairs sharing one buffer, and when a buffer is full, the producer or consumer temporarily switches to another open buffer. It's a form of work stealing.Zan Lynx
Efficient lock-free algorithms are unique snowflakes whose discovery usually merits a research article. I'm not going to attempt to answer this question until OP distinguishes his actual requirements from what he thinks a solution should look like.Dave
A millisecond is a very fast timing deadline on unmodified Linux. If another process gets to run, then you could easily miss it. You would need to use real-time priorities, and even then I'm not sure that you can reliably meet those deadlines. Are you sure that you need to be that responsive? Can you make just the producers that fast, implement them in e.g. a device driver, and relax the requirements on the consumers?Doug

18 Answers

42
votes

I've made a particular study of lock-free data structures in the last couple of years. I've read most of the papers in the field (there's only about fourty or so - although only about ten or fifteen are any real use :-)

AFAIK, a lock-free circular buffer has not been invented. The problem will be dealing with the complex condition where a reader overtakes a writer or vis-versa.

If you have not spent at least six months studying lock-free data structures, do not attempt to write one yourself. You will get it wrong and it may not be obvious to you that errors exist, until your code fails, after deployment, on new platforms.

I believe however there is a solution to your requirement.

You should pair a lock-free queue with a lock-free free-list.

The free-list will give you pre-allocation and so obviate the (fiscally expensive) requirement for a lock-free allocator; when the free-list is empty, you replicate the behaviour of a circular buffer by instantly dequeuing an element from the queue and using that instead.

(Of course, in a lock-based circular buffer, once the lock is obtained, obtaining an element is very quick - basically just a pointer dereference - but you won't get that in any lock-free algorithm; they often have to go well out of their way to do things; the overhead of failing a free-list pop followed by a dequeue is on a par with the amount of work any lock-free algorithm will need to be doing).

Michael and Scott developed a really good lock-free queue back in 1996. A link below will give you enough details to track down the PDF of their paper; Michael and Scott, FIFO

A lock-free free-list is the simplest lock-free algorithm and in fact I don't think I've seen an actual paper for it.

34
votes

The term of art for what you want is a lock-free queue. There's an excellent set of notes with links to code and papers by Ross Bencina. The guy whose work I trust the most is Maurice Herlihy (for Americans, he pronounces his first name like "Morris").

11
votes

The requirement that producers or consumers block if the buffer is empty or full suggests that you should use a normal locking data structure, with semaphores or condition variables to make the producers and consumers block until data is available. Lock-free code generally doesn't block on such conditions - it spins or abandons operations that can't be done instead of blocking using the OS. (If you can afford to wait until another thread produces or consumes data, then why is waiting on a lock for another thread to finish updating the data structure any worse?)

On (x86/x64) Linux, intra-thread synchronization using mutexes is reasonably cheap if there is no contention. Concentrate on minimizing the time that the producers and consumers need to hold onto their locks. Given that you've said that you only care about the last N recorded data points, I think a circular buffer would be do this reasonably well. However, I don't really understand how this fits in with the blocking requirement and the idea of consumers actually consuming (removing) the data they read. (Do you want consumers to only look at the last N data points, and not remove them? Do you want producers to not care if consumers can't keep up, and just overwrite old data?)

Also, as Zan Lynx commented, you can aggregate/buffer up your data into bigger chunks when you've got lots of it coming in. You could buffer up a fixed number of points, or all the data received within a certain amount of time. This means that there will be fewer synchronization operations. It does introduce latency, though, but if you're not using real-time Linux, then you'll have to deal with that to an extent anyway.

7
votes

The implementation in the boost library is worth considering. It's easy to use and fairly high performance. I wrote a test & ran it on a quad core i7 laptop (8 threads) and get ~4M enqueue/dequeue operations a second. Another implementation not mentioned so far is the MPMC queue at http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue. I have done some simple testing with this implementation on the same laptop with 32 producers and 32 consumers. It is, as advertised, faster that the boost lockless queue.

As most of the other answers state lockless programming is hard. Most implementations will have hard to detect corner cases that take a lot of testing & debugging to fix. These are typically fixed with careful placement of memory barriers in the code. You will also find proofs of correctness published in many of the academic articles. I prefer testing these implementations with a brute force tool. Any lockless algorithm you plan on using in production should be checked for correctness using a tool like http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html.

6
votes

There is a pretty good series of articles about this on DDJ. As a sign of how difficult this stuff can be, it's a correction on an earlier article that got it wrong. Make sure you understand the mistakes before you roll your own )-;

5
votes

I am not expert of hardware memory models and lock free data structures and I tend to avoid using those in my projects and I go with traditional locked data structures.

However I`ve recently noticed that video : Lockless SPSC queue based on ring buffer

This is based on an open source high performance Java library called LMAX distruptor used by a trading system : LMAX Distruptor

Based on the presentation above, you make head and tail pointers atomic and atomically check for the condition where head catches tail from behind or vice versa.

Below you can see a very basic C++11 implementation for it :

// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;

#define RING_BUFFER_SIZE 1024  // power of 2 for efficient %
class lockless_ring_buffer_spsc
{
    public :

        lockless_ring_buffer_spsc()
        {
            write.store(0);
            read.store(0);
        }

        bool try_push(int64_t val)
        {
            const auto current_tail = write.load();
            const auto next_tail = increment(current_tail);
            if (next_tail != read.load())
            {
                buffer[current_tail] = val;
                write.store(next_tail);
                return true;
            }

            return false;  
        }

        void push(int64_t val)
        {
            while( ! try_push(val) );
            // TODO: exponential backoff / sleep
        }

        bool try_pop(int64_t* pval)
        {
            auto currentHead = read.load();

            if (currentHead == write.load())
            {
                return false;
            }

            *pval = buffer[currentHead];
            read.store(increment(currentHead));

            return true;
        }

        int64_t pop()
        {
            int64_t ret;
            while( ! try_pop(&ret) );
            // TODO: exponential backoff / sleep
            return ret;
        }

    private :
        std::atomic<int64_t> write;
        std::atomic<int64_t> read;
        static const int64_t size = RING_BUFFER_SIZE;
        int64_t buffer[RING_BUFFER_SIZE];

        int64_t increment(int n)
        {
            return (n + 1) % size;
        }
};

int main (int argc, char** argv)
{
    lockless_ring_buffer_spsc queue;

    std::thread write_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.push(i);
             }
         }  // End of lambda expression
                                                );
    std::thread read_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.pop();
             }
         }  // End of lambda expression
                                                );
    write_thread.join();
    read_thread.join();

     return 0;
}
4
votes

One useful technique to reduce contention is to hash the items into multiple queues and have each consumer dedicated to a "topic".

For most-recent number of items your consumers are interested in - you don't want to lock the whole queue and iterate over it to find an item to override - just publish items in N-tuples, i.e. all N recent items. Bonus points for implementation where producer would block on the full queue (when consumers can't keep up) with a timeout, updating its local tuple cache - that way you don't put back-pressure on the data source.

4
votes

Sutter's queue is sub-optimal and he knows it. The Art of Multicore programming is a great reference but don't trust the Java guys on memory models, period. Ross's links will get you no definite answer because they had their libraries in such problems and so on.

Doing lock-free programming is asking for trouble, unless you want to spend a lot of time on something that you are clearly over-engineering before solving the problem (judging by the description of it, it is a common madness of 'looking for perfection' in cache coherency). It takes years and leads to not solving the problems first and optimising later, a common disease.

3
votes

I would agree with this article and recommend against using lock-free data structures. A relatively recent paper on lock-free fifo queues is this, search for further papers by the same author(s); there's also a PhD thesis on Chalmers regarding lock-free data structures (I lost the link). However, you did not say how large your elements are -- lock-free data structures work efficiently only with word-sized items, so you'll have to dynamically allocate your elements if they're larger than a machine word (32 or 64 bits). If you dynamically allocate elements, you shift the (supposed, since you haven't profiled your program and you're basically doing premature optimization) bottleneck to memory allocator, so you need a lock-free memory allocator, e.g., Streamflow, and integrate it with your application.

3
votes

This is an old thread, but since it hasn't been mentioned, yet - there is a lock-free, circular, 1 producer -> 1 consumer, FIFO available in the JUCE C++ framework.

https://www.juce.com/doc/classAbstractFifo#details

3
votes

Although this is an old question, no one mentioned DPDK's lockless ring buffer. It's a high throughput ring buffer that supports multiple producers and multiple consumers. It also provides single consumer and single producer modes, and the ring buffer is wait-free in SPSC mode. It's written in C and supports multiple architectures.

In addition, it supports Bulk and Burst modes where items can be enqueued/dequeued in bulk. The design let's multiple consumers or multiple producers write to the queue at the same time by simple reserving the space through moving an atomic pointer.

3
votes

Sometime ago, I've found a nice solution to this problem. I believe that it the smallest found so far.

The repository has a example of how use it to create N threads (readers and writers) and make then share a single seat.

I made some benchmarks, on the test example and got the following results (in million ops/sec) :

By buffer size

throughput

By number of threads

enter image description here

Notice how the number of threads do not change the throughput.

I think this is the ultimate solution to this problem. It works and is incredible fast and simple. Even with hundreds of threads and a queue of a single position. It can be used as a pipeline beween threads, allocating space inside the queue.

Can you break it?

2
votes

Just for completeness: there's well tested lock-free circular buffer in OtlContainers, but it is written in Delphi (TOmniBaseBoundedQueue is circular buffer and TOmniBaseBoundedStack is bounded stack). There's also an unbounded queue in the same unit (TOmniBaseQueue). The unbounded queue is described in Dynamic lock-free queue – doing it right. The initial implementation of the bounded queue (circular buffer) was described in A lock-free queue, finally! but the code was updated since then.

2
votes

Check out Disruptor (How to use it) which is a ring-buffer that multiple threads can subscribe to:

1
votes

Here is how I would do it:

  • map the queue into an array
  • keep state with a next read and next next write indexes
  • keep an empty full bit vector around

Insertion consists of using a CAS with an increment and roll over on the next write. Once you have a a slot, add your value and then set the empty/full bit that matches it.

Removals require a check of the bit before to test on underflows but other than that, are the same as for the write but using read index and clearing the empty/full bit.

Be warned,

  1. I'm no expert in these things
  2. atomic ASM ops seem to be very slow when I've used them so if you end up with more than a few of them, you might be faster to use locks embedded inside the insert/remove functions. The theory is that a single atomic op to grab the lock followed by (very) few non atomic ASM ops might be faster than the same thing done by several atomic ops. But to make this work would require manual or automatic inlineing so it's all one short block of ASM.
1
votes

You may try lfqueue

It is simple to use, it is circular design lock free

int *ret;

lfqueue_t results;

lfqueue_init(&results);

/** Wrap This scope in multithread testing **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&results, int_data) != 1) ;

/*Dequeue*/
while ( (ret = lfqueue_deq(&results)) == NULL);

// printf("%d\n", *(int*) ret );
free(ret);
/** End **/

lfqueue_clear(&results);
1
votes

There are situations that you don't need locking to prevent race condition, especially when you have only one producer and consumer.

Consider this paragraph from LDD3:

When carefully implemented, a circular buffer requires no locking in the absence of multiple producers or consumers. The producer is the only thread that is allowed to modify the write index and the array location it points to. As long as the writer stores a new value into the buffer before updating the write index, the reader will always see a consistent view. The reader, in turn, is the only thread that can access the read index and the value it points to. With a bit of care to ensure that the two pointers do not overrun each other, the producer and the consumer can access the buffer concurrently with no race conditions.

0
votes

If you take as a prerequisite that the buffer will never become full, consider using this lock-free algorithm:

capacity must be a power of 2
buffer = new T[capacity] ~ on different cache line
mask = capacity - 1
write_index ~ on different cache line
read_index ~ on different cache line

enqueue:
    write_i = write_index.fetch_add(1) & mask
    buffer[write_i] = element ~ release store

dequeue:
    read_i = read_index.fetch_add(1) & mask
    element
    while ((element = buffer[read_i] ~ acquire load) == NULL) {
        spin loop
    }
    buffer[read_i] = NULL ~ relaxed store
    return element