2
votes

I was looking at the sample code for a lock-free queue at:

http://drdobbs.com/high-performance-computing/210604448?pgno=2

(Also reference in many SO questions such as Is there a production ready lock-free queue or hash implementation in C++)

This looks like it should work for a single producer/consumer, although there are a number of typos in the code. I've updated the code to read as shown below, but it's crashing on me. Anybody have suggestions why?

In particular, should divider and last be declared as something like:

atomic<Node *> divider, last;    // shared

I don't have a compiler supporting C++0x on this machine, so perhaps that's all I need...

// Implementation from http://drdobbs.com/high-performance-computing/210604448
// Note that the code in that article (10/26/11) is broken.
// The attempted fixed version is below.

template <typename T>
class LockFreeQueue {
private:
    struct Node {
        Node( T val ) : value(val), next(0) { }
        T value;
        Node* next;
    };
    Node *first,      // for producer only
    *divider, *last;    // shared
public:
    LockFreeQueue()
    {
        first = divider = last = new Node(T()); // add dummy separator
    }
    ~LockFreeQueue()
    {
        while( first != 0 )    // release the list
        {
            Node* tmp = first;
            first = tmp->next;
            delete tmp;
        }
    }
    void Produce( const T& t )
    {
        last->next = new Node(t);    // add the new item
        last = last->next;      // publish it

        while (first != divider) // trim unused nodes
        {
            Node* tmp = first;
            first = first->next;
            delete tmp;
        }
    }
    bool Consume( T& result )
    {
        if (divider != last)         // if queue is nonempty
        {
            result = divider->next->value; // C: copy it back
            divider = divider->next;      // D: publish that we took it
            return true;                  // and report success
        }
        return false;                   // else report empty
    }
};

I wrote the following code to test this. Main (not shown) just calls TestQ().

#include "LockFreeQueue.h"

const int numThreads = 1;
std::vector<LockFreeQueue<int> > q(numThreads);

void *Solver(void *whichID)
{
    int id = (long)whichID;
    printf("Thread %d initialized\n", id);
    int result = 0;
    do {
        if (q[id].Consume(result))
        {
            int y = 0;
            for (int x = 0; x < result; x++)
            { y++; }
            y = 0;
        }
    } while (result != -1);
    return 0;
}


void TestQ()
{
    std::vector<pthread_t> threads;
    for (int x = 0; x < numThreads; x++)
    {
        pthread_t thread;
        pthread_create(&thread, NULL, Solver, (void *)x);
        threads.push_back(thread);
    }
    for (int y = 0; y < 1000000; y++)
    {
        for (unsigned int x = 0; x < threads.size(); x++)
        {
            q[x].Produce(y);
        }
    }
    for (unsigned int x = 0; x < threads.size(); x++)
    {
        q[x].Produce(-1);
    }
    for (unsigned int x = 0; x < threads.size(); x++)
        pthread_join(threads[x], 0);    
}

Update: It ends up that the crash is being caused by the queue declaration:

std::vector<LockFreeQueue<int> > q(numThreads);

When I change this to be a simple array, it runs fine. (I implemented a version with locks and it was crashing too.) I see that the destructor is being called immediate after the constructor, resulting in doubly-freed memory. But, does anyone know WHY the destructor would be called immediately with a std::vector?

3
@OliCharlesworth "malloc: *** error for object 0x100100a20: pointer being freed was not allocated" on the line "delete tmp;"Nathan S.
Another simple method to implement a single reader_single_ writer queue I've implemented is using 2 queues and an atomic 'swap buffer': queue R for reading, queue W for writing and the 'swap buffer` S that is initially set to null. Usage: after the producer is done writing: exchange W <-> S. The consumer reads from R as usual, exchanging R <-> S if R is exhausted; trying one more read after the swap. Lastly, if a reader (or writer) encounters a nullptr before reading (or writing) then exchange R <-> S (or W <-> S)...pauluss86
... The advantage is simplicity; the disadvantage is that a consumer could repeatedly exchange empty queues if nothing was written between reads. In my implementation, I've implemented a listen() method that blocks the reader until it's notified by the writer.pauluss86

3 Answers

1
votes

You'll need to make several of the pointers std::atomic, as you note, and you'll need to use compare_exchange_weak in a loop to update them atomically. Otherwise, multiple consumers might consume the same node and multiple producers might corrupt the list.

1
votes

It's critically important that these writes (just one example from your code) occur in order:

last->next = new Node(t);    // add the new item
last = last->next;      // publish it

That's not guaranteed by C++ -- the optimizer can rearrange things however it likes, as long as the current thread always acts as-if the program ran exactly the way you wrote it. And then the CPU cache can come along and reorder things further.

You need memory fences. Making the pointers use the atomic type should have that effect.

0
votes

This could be totally off the mark, but I can't help but wonder whether you're having some sort of static initialization related issue... For laughs, try declaring q as a pointer to a vector of lock-free queues and allocating it on the heap in main().