0
votes

Multiple producers single consumer scenario, except consumption happens once and after that the queue is "closed" and no more work is allowed. I have a MPSC queue, so I tried to add a lock-free algorithm to "close" the queue. I believe it's correct and it passes my tests. The problem is when I try to optimise memory order it stops working (I think work is lost, e.g. enqueued after the queue is closed). Even on x64 which has "kind of" strong memory model, even with a single producer.

My attempt to fine-tune memory order is commented out:

// thread-safe for multi producers single consumer use
// linked-list based, and so it's growable 
MPSC_queue work_queue;
std::atomic<bool> closed{ false };
std::atomic<int32_t> producers_num{ 0 };
bool produce(Work&& work)
{
    bool res = false;

    ++producers_num;
    // producers_num.fetch_add(1, std::memory_order_release);
    if (!closed)
    // if (!closed.load(std::memory_order_acquire))
    {
        work_queue.push(std::move(work));
        res = true;
    }
    --producers_num;
    // producers_num.fetch_sub(1, std::memory_order_release);

    return res;
}
void consume()
{
    closed = true;
    // closed.store(true, std::memory_order_release);

    while (producers_num != 0)
    // while (producers_num.load(std::memory_order_acquire) != 0)
        std::this_thread::yield();

    Work work;
    while (work_queue.pop(work))
        process(work);
}

I also tried std::memory_order_acq_rel for read-modify-write ops on producers_num, doesn't work either.

A bonus question:

This algorithm is used with MPSC queue, which already does some synchronisation inside. It would be nice to combine them for better performance. Do you know any such algorithm for "closable" MPSC queue?

1
What the heck does work_queue.push do? Is that thread-safe? You seem to have left out key details like declarations of the variables.Peter Cordes
it’s MPSC queue - lock-free multi producers single consumerAndriy Tylychko
@PeterCordes: added missing detailsAndriy Tylychko
Since you only want to use the queue once, it doesn't need to wrap around and can probably be vastly simpler. Like an atomic producer-position counter that writers increment to claim a spot, and if they get a position > size then the queue was full.Peter Cordes
@PeterCordes: not sure I follow. the queue is never full as it's linked-list based. this is probably an important detail, added to the questionAndriy Tylychko

1 Answers

2
votes

I think closed = true; does need to be seq_cst to make sure it's visible to other threads before you check producers_num the first time. Otherwise this ordering is possible:

  • producer: ++producers_num;
  • consumer: producers_num == 0
  • producer: if (!closed) finds it still open
  • consumer: close.store(true, release) becomes globally visible.
  • consumer: work_queue.pop(work) finds the queue empty
  • producer: work_queue.push(std::move(work)); adds work to the queue after consumer has stopped looking.

You can still avoid seq_cst if you have the consumer check producers_num == 0 before returning, like

    while (producers_num != 0)
    // while (producers_num.load(std::memory_order_acquire) != 0)
        std::this_thread::yield();

    do {
        Work work;
        while (work_queue.pop(work))
            process(work);
    } while(producers_num.load(acquire) != 0);
    // safe if pop included a full barrier, I think

I'm not 100% sure I have this right, but I think checking producer_num after a full barrier is sufficient.

However, the producer side does need ++producers_num; to be at least acq_rel, otherwise it can reorder past if (!closed). (An acquire fence after it, before if(!closed) might also work).


Since you only want to use the queue once, it doesn't need to wrap around and can probably be quite a lot simpler. Like an atomic producer-position counter that writers increment to claim a spot, and if they get a position > size then the queue was full. I haven't thought through the full details, though.

That might allow a cleaner solution to the above problem, perhaps by having the consumer look at that write index to see if there were any producer