I implemented a lock-free queue based on the algorithm specified in Maged M. Michael and Michael L. Scott work Simple, Fast, and Practical Non-Blocking and Blocking
Concurrent Queue Algorithms (for the algorithm, jump to page 4)
I used atomic operation on shared_ptr such as std::atomic_load_explicit etc.
when using the queue in one thread only, everything is fine, but when using it from different thread, I get a stack overflow exception.
I could not trace the source of the problem unfortunately. it seems that when one shared_ptr is getting out of scope, it decrements the number of references on the next ConcurrentQueueNode and it causes an infinite recursion, but I can't see why..
the Code:
the Queue node:
template<class T>
struct ConcurrentQueueNode {
T m_Data;
std::shared_ptr<ConcurrentQueueNode> m_Next;
template<class ... Args>
ConcurrentQueueNode(Args&& ... args) :
m_Data(std::forward<Args>(args)...) {}
std::shared_ptr<ConcurrentQueueNode>& getNext() {
return m_Next;
}
T getValue() {
return std::move(m_Data);
}
};
The concurrent queue (note: not for the faint hearted):
template<class T>
class ConcurrentQueue {
std::shared_ptr<ConcurrentQueueNode<T>> m_Head, m_Tail;
public:
ConcurrentQueue(){
m_Head = m_Tail = std::make_shared<ConcurrentQueueNode<T>>();
}
template<class ... Args>
void push(Args&& ... args) {
auto node = std::make_shared<ConcurrentQueueNode<T>>(std::forward<Args>(args)...);
std::shared_ptr<ConcurrentQueueNode<T>> tail;
for (;;) {
tail = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire);
std::shared_ptr<ConcurrentQueueNode<T>> next =
std::atomic_load_explicit(&tail->getNext(),std::memory_order_acquire);
if (tail == std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)) {
if (next.get() == nullptr) {
auto currentNext = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)->getNext();
auto res = std::atomic_compare_exchange_weak(&tail->getNext(), &next, node);
if (res) {
break;
}
}
else {
std::atomic_compare_exchange_weak(&m_Tail, &tail, next);
}
}
}
std::atomic_compare_exchange_strong(&m_Tail, &tail, node);
}
bool tryPop(T& dest) {
std::shared_ptr<ConcurrentQueueNode<T>> head;
for (;;) {
head = std::atomic_load_explicit(&m_Head, std::memory_order_acquire);
auto tail = std::atomic_load_explicit(&m_Tail,std::memory_order_acquire);
auto next = std::atomic_load_explicit(&head->getNext(), std::memory_order_acquire);
if (head == std::atomic_load_explicit(&m_Head, std::memory_order_acquire)) {
if (head.get() == tail.get()) {
if (next.get() == nullptr) {
return false;
}
std::atomic_compare_exchange_weak(&m_Tail, &tail, next);
}
else {
dest = next->getValue();
auto res = std::atomic_compare_exchange_weak(&m_Head, &head, next);
if (res) {
break;
}
}
}
}
return true;
}
};
example usage that reproduces the problem :
int main(){
ConcurrentQueue<int> queue;
std::thread threads[4];
for (auto& thread : threads) {
thread = std::thread([&queue] {
for (auto i = 0; i < 100'000; i++) {
queue.push(i);
int y;
queue.tryPop(y);
}
});
}
for (auto& thread : threads) {
thread.join();
}
return 0;
}
truefor a call tostd::atomic_is_lock_free( &ptr )whereptris somestd::shared_ptr<>instance. - cr_oag