4
votes

I'm trying to implement a lock-free queue that uses a linear circular buffer to store data. In contrast to a general-purpose lock-free queue I have the following relaxing conditions:

  • I know the worst-case number of elements that will ever be stored in the queue. The queue is part of a system that operates on a fixed set of elements. The code will never attempt to store more elements in the queue as there are elements in this fixed set.
  • No multi-producer/multi-consumer. The queue will either be used in a multi-producer/single-consumer or a single-producer/multi-consumer setting.

Conceptually, the queue is implemented as follows

  • Standard power-of-two ring buffer. The underlying data-structure is a standard ring-buffer using the power-of-two trick. Read and write indices are only ever incremented. They are clamped to the size of the underlying array when indexing into the array using a simple bitmask. The read pointer is atomically incremented in pop(), the write pointer is atomically incremented in push().
  • Size variable gates access to pop(). An additional "size" variable tracks the number of elements in the queue. This eliminates the need to perform arithmetic on the read and write indices. The size variable is atomically incremented after the entire write operation has taken place, i.e. the data has been written to the backing storage and the write cursor has been incremented. I'm using a compare-and-swap (CAS) operation to atomically decrement size in pop() and only continue, if size is non-zero. This way pop() should be guaranteed to return valid data.

My queue implementation is as follows. Note the debug code that halts execution whenever pop() attempts to read past the memory that has previously been written by push(). This should never happen, since ‒ at least conceptually ‒ pop() may only proceed if there are elements on the queue (there should be no underflows).

#include <atomic>
#include <cstdint>
#include <csignal> // XXX for debugging

template <typename T>
class Queue {
private:
    uint32_t m_data_size;   // Number of elements allocated
    std::atomic<T> *m_data; // Queue data, size is power of two
    uint32_t m_mask;        // Bitwise AND mask for m_rd_ptr and m_wr_ptr
    std::atomic<uint32_t> m_rd_ptr; // Circular buffer read pointer
    std::atomic<uint32_t> m_wr_ptr; // Circular buffer write pointer
    std::atomic<uint32_t> m_size;   // Number of elements in the queue

    static uint32_t upper_power_of_two(uint32_t v) {
        v--; // https://graphics.stanford.edu/~seander/bithacks.html
        v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
        v++;
        return v;
    }

public:
    struct Optional { // Minimal replacement for std::optional
        bool good;
        T value;
        Optional() : good(false) {}
        Optional(T value) : good(true), value(std::move(value)) {}
        explicit operator bool() const { return good; }
    };

    Queue(uint32_t max_size)
        : // XXX Allocate 1 MiB of additional memory for debugging purposes
          m_data_size(upper_power_of_two(1024 * 1024 + max_size)),
          m_data(new std::atomic<T>[m_data_size]),
          m_mask(m_data_size - 1),
          m_rd_ptr(0),
          m_wr_ptr(0),
          m_size(0) {
        // XXX Debug code begin
        // Fill the memory with a marker so we can detect invalid reads
        for (uint32_t i = 0; i < m_data_size; i++) {
            m_data[i] = 0xDEADBEAF;
        }
        // XXX Debug code end
    }

    ~Queue() { delete[] m_data; }

    Optional pop() {
        // Atomically decrement the size variable
        uint32_t size = m_size.load();
        while (size != 0 && !m_size.compare_exchange_weak(size, size - 1)) {
        }

        // The queue is empty, abort
        if (size <= 0) {
            return Optional();
        }

        // Read the actual element, atomically increase the read pointer
        T res = m_data[(m_rd_ptr++) & m_mask].load();

        // XXX Debug code begin
        if (res == T(0xDEADBEAF)) {
            std::raise(SIGTRAP);
        }
        // XXX Debug code end
        return res;
    }

    void push(T t) {
        m_data[(m_wr_ptr++) & m_mask].store(t);
        m_size++;
    }

    bool empty() const { return m_size == 0; }
};

However, underflows do occur and can easily be triggered in a multi-threaded stress-test. In this particular test I maintain two queues q1 and q2. In the main thread I feed a fixed number of elements into q1. Two worker threads read from q1 and push onto q2 in a tight loop. The main thread reads data from q2 and feeds it back to q1.

This works fine if there is only one worker-thread (single-producer/single-consumer) or as long as all worker-threads are on the same CPU as the main thread. However, it fails as soon as there are two worker threads that are explicitly scheduled onto a different CPU than the main thread.

The following code implements this test

#include <pthread.h>
#include <thread>
#include <vector>

static void queue_stress_test_main(std::atomic<uint32_t> &done_count,
                                   Queue<int> &queue_rd, Queue<int> &queue_wr) {
    for (size_t i = 0; i < (1UL << 24); i++) {
        auto res = queue_rd.pop();
        if (res) {
            queue_wr.push(res.value);
        }
    }
    done_count++;
}

static void set_thread_affinity(pthread_t thread, int cpu) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(cpu, &cpuset);
    if (pthread_setaffinity_np(thread, sizeof(cpu_set_t),
                               &cpuset) != 0) {
        throw "Error while calling pthread_setaffinity_np";
    }
}

int main() {
    static constexpr uint32_t n_threads{2U}; // Number of worker threads
    //static constexpr uint32_t n_threads{1U}; // < Works fine
    static constexpr uint32_t max_size{16U}; // Elements in the queue
    std::atomic<uint32_t> done_count{0};     // Number of finished threads
    Queue<int> queue1(max_size), queue2(max_size);

    // Launch n_threads threads, make sure the main thread and the two worker
    // threads are on different CPUs.
    std::vector<std::thread> threads;
    for (uint32_t i = 0; i < n_threads; i++) {
        threads.emplace_back(queue_stress_test_main, std::ref(done_count),
                             std::ref(queue1), std::ref(queue2));
        set_thread_affinity(threads.back().native_handle(), 0);
    }
    set_thread_affinity(pthread_self(), 1);
    //set_thread_affinity(pthread_self(), 0); // < Works fine

    // Pump data from queue2 into queue1
    uint32_t elems_written = 0;
    while (done_count < n_threads || !queue2.empty()) {
        // Initially fill queue1 with all values from 0..max_size-1
        if (elems_written < max_size) {
            queue1.push(elems_written++);
        }

        // Read elements from queue2 and put them into queue1
        auto res = queue2.pop();
        if (res) {
            queue1.push(res.value);
        }
    }

    // Wait for all threads to finish
    for (uint32_t i = 0; i < n_threads; i++) {
        threads[i].join();
    }
}

Most of the time this program triggers the trap in the queue code, which means that pop() attempts to read memory that has never been touched by push() ‒ although pop() should only succeed if push() has been called at least as often as pop().

You can compile and run the above program with GCC/clang on Linux using

c++ -std=c++11 queue.cpp -o queue -lpthread && ./queue

Either just concatenate the above two code blocks or download the complete program here.

Note that I'm a complete novice when it comes to lock-free datastructures. I'm perfectly aware that there are plenty of battle-tested lock-free queue implementations for C++. However, I simply can't figure out why the above code does not work as intended.

1
could not reproduce your problem :repl.it/repls/FlashyMediocreDebuggersPapaDiHatti
@Kapil: Thanks for trying this out! Although (as pointed out in the comments below) the algorithm is fundamentally flawed, this will only fail with a high probability on a decently fast SMP machine. It is better to try this algorithm without any virtualisation in an execution environment you have full control over. Plus, putting a print statement in the thread is bad idea, since the loop has to be as tight as possible to increase the chance of triggering the concurrency issue.Andreas Stöckel

1 Answers

3
votes

You have two bugs, one of which can cause the failure you observe.

Let's look at your push code, except we'll allow only one operation per statement:

void push(T t)
{
    auto const claimed_index = m_wr_ptr++;               /* 1 */
    auto const claimed_offset = claimed_index & m_mask; /* 2 */
    auto& claimed_data = m_data[claimed_offset];         /* 3 */
    claimed_data.store(t);                               /* 4 */
    m_size++;                                            /* 5 */
}

Now, for a queue with two producers, there is a window of vulnerability to a race condition between operations 1 and 4:

Before:

m_rd_ptr == 1
m_wr_ptr == 1
m_size == 0

Producer A:

/* 1 */ claimed_index = 1; m_wr_ptr = 2;
/* 2 */ claimed_offset = 1;
  • Scheduler puts Producer A to sleep here

Producer B:

/* 1 */ claimed_index = 2; m_wr_ptr = 3;
/* 2 */ claimed_offset = 2;
/* 3 */ claimed_data = m_data[2];
/* 4 */ claimed_data.store(t);
/* 5 */ m_size = 1;

After:

m_size == 1
m_rd_ptr == 1
m_wr_ptr == 3
m_data[1] == 0xDEADBEAF
m_data[2] == value_produced_by_B

The consumer now runs, sees m_size > 0, and reads from m_data[1] while increasing m_rd_ptr from 1 to 2. But m_data[1] hasn't been written by Producer A yet, and Producer B wrote to m_data[2].

The second bug is the complementary case in pop() when a consumer thread is interrupted between the m_rd_ptr++ action and the .load() call. It can result in reading values out of order, potentially so far out of order that the queue has completely circled and overwritten the original value.

Just because two operations in a single source statement are atomic does not make the entire statement atomic.