4
votes

Short version:

I'm trying to replace std::atomic from C++11 used in a lock-free, single producer, single consumer queue implementation from here. How do I replace this with boost::atomic?

Long version:

I'm trying to get a better performance out of our app with worker threads. Each thread has its own task queue. We have to synchronize using lock before dequeue/enqueue each task.

Then I found Herb Sutter's article on lock-free queue. It seems like an ideal replacement. But the code uses std::atomic from C++11, which I couldn't introduce to the project at this time.

More googling led to some examples, such as this one for Linux (echelon's), and this one for Windows (TINESWARE's). Both use platform's specific constructs such as WinAPI's InterlockedExchangePointer, and GCC's __sync_lock_test_and_set.

I only need to support Windows & Linux so maybe I can get away with some #ifdefs. But I thought it might be nicer to use what boost::atomic provides. Boost Atomic is not part of official Boost library yet. So I downloaded the source from http://www.chaoticmind.net/~hcb/projects/boost.atomic/ and use the include files with my project.

This is what I get so far:

#pragma once

#include <boost/atomic.hpp>

template <typename T>
class LockFreeQueue
{
private:
    struct Node
    {
        Node(T val) : value(val), next(NULL) { }
        T value;
        Node* next;
    };
    Node* first; // for producer only
    boost::atomic<Node*> divider;  // shared
    boost::atomic<Node*> last; // shared

public:
    LockFreeQueue()
    {
        first = new Node(T());
        divider = first;
        last= first;
    }

    ~LockFreeQueue()
    {
        while(first != NULL) // release the list
        {
            Node* tmp = first;
            first = tmp->next;
            delete tmp;
        }
    }

    void Produce(const T& t)
    {
        last.load()->next = new Node(t); // add the new item
        last = last.load()->next;
        while(first != divider) // trim unused nodes
        {
            Node* tmp = first;
            first = first->next;
            delete tmp;
        }
    }

    bool Consume(T& result)
    {
        if(divider != last) // if queue is nonempty
        {
            result = divider.load()->next->value; // C: copy it back
            divider = divider.load()->next;
            return true;  // and report success
        }
        return false;  // else report empty
    }
};

Some modifications to note:

boost::atomic<Node*> divider;  // shared
boost::atomic<Node*> last; // shared

and

    last.load()->next = new Node(t); // add the new item
    last = last.load()->next;

and

        result = divider.load()->next->value; // C: copy it back
        divider = divider.load()->next;

Am I applying the load() (and the implicit store()) from boost::atomic correctly right here? Can we say this is equivalent to Sutter's original C++11 lock-free queue?

PS. I studied many of the threads on SO, but none seems to provide an example for boost::atomic & lock-free queue.

2
Before you start poking around the lock-free hornet's nest, consider giving larger payloads to your worker threads so that the time spent on mutex contention becomes less significant. For example, if you are processing a stream of data, gather the data into large chunks before passing it on to a worker thread.Emile Cormier
FWIF, please note, that Lockfree library was accepted to Boost: lists.boost.org/boost-announce/2011/08/0331.php tim.klingt.org/boost_lockfreeIgor R.
@EmileCormier I'll work on this, thx.Gant
@m3rLinEz Did you used your code with success?subb
As Igor R mentioned, the lockfree queue is already available in boost. You can use it directly.Kemin Zhou

2 Answers

1
votes

Have you tried Intel Thread Building Blocks' atomic<T>? Cross platform and free.

Also...

Single producer/single consumer makes your problem much easier because your linearization point can be a single operator. It becomes easier still if you are prepared to accept a bounded queue.

A bounded queue offers advantages for cache performance because you can reserve a cache aligned memory block to maximize your hits, e.g.:

#include <vector>
#include "tbb/atomic.h"
#include "tbb/cache_aligned_allocator.h"    

template< typename T >
class SingleProdcuerSingleConsumerBoundedQueue { 
    typedef vector<T, cache_aligned_allocator<T> > queue_type;

public:
    BoundedQueue(int capacity):
        queue(queue_type()) {
        head = 0;
        tail = 0;
        queue.reserve(capacity);
    }

    size_t capacity() {
        return queue.capacity();
    }

    bool try_pop(T& result) {
        if(tail - head == 0)
            return false;
        else {
            result = queue[head % queue.capacity()];
            head.fetch_and_increment(); //linearization point
            return(true);
        }
    }

    bool try_push(const T& source) {
        if(tail - head == queue.capacity()) 
            return(false);
        else {
            queue[tail %  queue.capacity()] = source;
            tail.fetch_and_increment(); //linearization point
            return(true);
        }
    }

    ~BoundedQueue() {}

private:
    queue_type queue;
    atomic<int> head;
    atomic<int> tail;
};
0
votes

Check out this boost.atomic ringbuffer example from the documentation:

#include <boost/atomic.hpp>

template <typename T, size_t Size>
class ringbuffer
{
public:
    ringbuffer() : head_(0), tail_(0) {}

    bool push(const T & value)
    {
        size_t head = head_.load(boost::memory_order_relaxed);
        size_t next_head = next(head);
        if (next_head == tail_.load(boost::memory_order_acquire))
            return false;
        ring_[head] = value;
        head_.store(next_head, boost::memory_order_release);
        return true;
    }

    bool pop(T & value)
    {
        size_t tail = tail_.load(boost::memory_order_relaxed);
        if (tail == head_.load(boost::memory_order_acquire))
            return false;
        value = ring_[tail];
        tail_.store(next(tail), boost::memory_order_release);
        return true;
    }

private:
    size_t next(size_t current)
    {
        return (current + 1) % Size;
    }

    T ring_[Size];
    boost::atomic<size_t> head_, tail_;
};

// How to use    
int main()
{
    ringbuffer<int, 32> r;

    // try to insert an element
    if (r.push(42)) { /* succeeded */ }
    else { /* buffer full */ }

    // try to retrieve an element
    int value;
    if (r.pop(value)) { /* succeeded */ }
    else { /* buffer empty */ }
}

The code's only limitation is that the buffer length has to be known at compile time (or at construction time, if you replace the array by a std::vector<T>). To allow the buffer to grow and shrink is not trivial, as far as I understand.