I'm programming a lock-free single-consumer single-producer growable queue in C++ for a real-time system. The internal queue works but it needs to be growable. The producer thread is real-time thus any operation needs to be deterministic (so no waits, locks, memory allocations), while the consumer thread isn't.
Thus the idea is that the consumer thread occasionally grows the size of the queue if need be. The implementation of the queue is such that the consumer-end cannot grow. Therefore the actual queue is indirectly wrapped inside an object which dispatches calls, and the actual growth is implemented by swapping the reference to the internal queue to a new, while keeping the old one hanging around in case the producer thread is using it.
The problem however is, that I cannot figure out how to prove when the producer thread stops using the old queue and it therefore is safe to delete without having to resort to locks. Here is a pseudo-representation of the code:
template<typename T>
class queue
{
public:
queue()
: old(nullptr)
{
current.store(nullptr);
grow();
}
bool produce(const T & data)
{
qimpl * q = current.load();
return q->produce(data);
}
bool consume(T & data)
{
// the queue has grown? if so, a new and an old queue exists. consume the old firstly.
if (old)
{
// here is the problem. we never really know when the producer thread stops using
// the old queue and starts using the new. it could be concurrently halfway-through inserting items
// now, while the following consume call fails meanwhile.
// thus, it is not safe yet to delete the old queue.
// we know however, that it will take at most one call to produce() after we called grow()
// before the producer thread starts using the new queue.
if (old->consume(data))
{
return true;
}
else
{
delete old;
old = nullptr;
}
}
if (current.load()->consume(data))
{
return true;
}
return false;
}
// consumer only as well
void grow()
{
old = current.load();
current.store(new qimlp());
}
private:
class qimpl
{
public:
bool produce(const T & data);
bool consume(const T & data);
};
std::atomic<qimpl *> current;
qimpl * old;
};
Note that ATOMIC_POINTER_LOCK_FREE == 2 is a condition for the code to compile. The only provable condition I see, is, that if grow() is called, the next produce() call will use the new internal queue. Thus if an atomic count inside produce is incremented each call, then its safe to delete the old queue at N + 1, where N is the count at the time of the grow() call. The issue, however, is that you then need to atomically swap the new pointer and store the count which seems not possible.
Any ideas is welcome, and for reference, this is how the system would work:
queue<int> q;
void consumer()
{
while (true)
{
int data;
if (q.consume(data))
{
// ..
}
}
}
void producer()
{
while (true)
{
q.produce(std::rand());
}
}
int main()
{
std::thread p(producer); std::thread c(consumer);
p.detach(); c.detach();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
EDIT: Okay, the problem is solved now. It dawned on me, that the old queue is provably outdated when an item is pushed to the new queue. Thus the snippet now looks like this:
bool pop(T & data)
{
if (old)
{
if (old->consume(data))
{
return true;
}
}
// note that if the old queue is empty, and the new has an enqueued element, we can conclusively
// prove that it is safe to delete the old queue since it is (a) empty and (b) the thread state
// for the producer is updated such that it uses all the new entities and will never use the old again.
// if we successfully dequeue an element, we can delete the old (if it exists).
if (current.load()->consume(data))
{
if (old)
{
delete old;
old = nullptr;
}
return true;
}
return false;
}