0
votes

I would like to implement a slight modification of the usual producer-consumer problem, with an intermediate 'worker' thread(s), with a limited resource. An example application could be:

  • A producer thread reads records from a file and puts them in a queue. Once the end of file is reached, a notification should be sent to the worker thread(s).
  • One or more 'worker' threads pulls records from the producer queue, does some sort of processing, and pushes the processed records to another queue. Once all records have been processed a notification is sent to the consumer thread.
  • A single consumer thread writes the processed records to a file.

I'm not saying this is a good method to solve such a problem, but it highlights the issue I'm trying to tackle, namely how to notify the worker and consumer threads correctly.

I have a thread safe queue with the following interface:

template<class T>
class threadsafe_queue
{
public:
    threadsafe_queue() {}
    threadsafe_queue(const threadsafe_queue& other);
    void push(T new_value);
    void wait_and_pop(T& value);
    std::shared_ptr<T> wait_and_pop();
    bool try_pop(T& value);
    std::shared_ptr<T> try_pop();
    bool empty() const;
};

My first idea to solve the problem with a single worker thread was to use two atomic bools, as follows:

#include <chrono>
#include <thread>

void queue_producer(threadsafe_queue<unsigned>& queue, std::atomic<bool>& producer_finished)
{
    for (unsigned i = 0; i < 10; ++i) {
        queue.push(i);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    producer_finished.store(true);
    std::cout << "Producer finished." << std::endl;
}

void queue_processor(threadsafe_queue<unsigned>& in_queue, threadsafe_queue<unsigned>& out_queue,
                 std::atomic<bool>& producer_finished, std::atomic<bool>& processor_finished)
{
    unsigned value;
    while (!producer_finished.load()) {
        in_queue.wait_and_pop(value);
        value *= 10;
        out_queue.push(value);
    }
    processor_finished.store(true);
    std::cout << "Processor finished." << std::endl;
}

void queue_consumer(threadsafe_queue<unsigned>& queue, std::atomic<bool>& processor_finished)
{
    unsigned value;
    while (!processor_finished.load()) {
        queue.wait_and_pop(value);
        std::cout << "Received value " << value << "." << std::endl; // Or write to file etc.
    }
    std::cout << "Consumer finished." << std::endl;
}

int main(int argc, const char * argv[])
{
    std::atomic<bool> producer_finished(false);
    std::atomic<bool> processor_finished(false);

    threadsafe_queue<unsigned> in_queue;
    threadsafe_queue<unsigned> out_queue;

    std::thread producer_thread(queue_producer, std::ref(in_queue), std::ref(producer_finished));
    std::thread processor_thread(queue_processor, std::ref(in_queue), std::ref(out_queue), std::ref(producer_finished), std::ref(processor_finished));
    std::thread consumer_thread(queue_consumer, std::ref(out_queue), std::ref(processor_finished));

    producer_thread.join();
    processor_thread.join();
    consumer_thread.join();

    return 0;
}

The problem with this is that the processor (and consumer) can re-enter the while loop before the atomic bool is set, and hence indefinitely wait for a record that will never come.

I also thought a solution could be to have some sort of sentinel value that is pushed onto the queue to signify the end (could use a wrapper class), but this doesn't seem a particularly nice way to do things, and it wouldn't work for the multi-worker version. I'm actually thinking that the multi-worker version is a much more difficult problem, so any help with the single worker version would be a great start.

2
Did you consider <condition_variable> ? - Christophe
"Multiworker is much more difficult": indeed ! There are a couple of articles on lockfree queues (example: drdobbs.com/parallel/lock-free-queues/208801974), but generally with one producer and one consumer. - Christophe
Is the sleep_for() meant to give other threads opportunity to become active, or do you really have to wait ? In the first case, you could use yield(), avoiding waiting for nothing. - Christophe
Yes, I guess I could also have used yield(), although I'm not sure what behaviour that would give if you have more cores than threads when it's called... - Daniel

2 Answers

0
votes

Mailbox – to solve Producer/Consumer problem

You may use one of the Thread Synchronization Primitives called Message Box (aka Mailbox) to link the Producer and Consumer threads together. Mailbox can be implemented using C++11 Mutex and Condition Variable.

A mailbox provides a means for two threads to exchange information. Typically, one thread will produce messages and send to another thread for processing. The messages are contained within the Mailbox thus thread safe.

Producer

mbox.Put(message);   

Attempts to place a message in the specified mailbox. If the mailbox is full, the call may block or not depends on design.

Consumer

message = mbox.Get();   // Blocking

Removes a message from the specified mailbox when it is available and returns (the address or moved copy of) the message.

Code snippets for Put and Get methods:

/// <summary>Push an item to the back of the queue. Move.</summary>
void Put(T && itemToAddByMoving)
{
    {
        std::lock_guard<std::mutex> lg(m_queueMutex);
        m_deque.push_back(std::forward<T>(itemToAddByMoving));   // Perfect forwarding: rvalue if argument is rvalue
    }   // release lock
    m_queueCondVar.notify_one();    // Notify consumer thread that there is data available in the queue.
}

/// <summary>Waiting for the queue to have data ready (not empty). Pop the first element when ready.</summary>
/// <returns>First element in front of the queue.</returns>
T Get()
{
    T poppedValue;      // T should support move.
    {
        std::unique_lock<std::mutex> ul(m_queueMutex);
        m_queueCondVar.wait(ul, [&]{ return !m_deque.empty(); });   // Wait here. Blocking.
        poppedValue = m_deque.front();
        m_deque.pop_front();
    }   // release lock
    return poppedValue;
}
0
votes

You could consider <condition_variable> with the principle of notifications, in conjunction with <mutex>.

You need somewhere:

std::mutex mtx;                     // for locking  
std::condition_variable ready;      // for waiting conditions 

On the producer side, you process the input and protect the update of the queue:

{
        std::lock_guard<std::mutex> guard(mtx);     // lock
        // ... updated the queue
        ready.notify_all();                 // notify consumers
}                                           // the lock is removed here 

On the consumer side you'd have a loop with:

{
    std::unique_lock<std::mutex> guard(mtx);        // lock 
                                                    // wait condition
    ready.wait(guard, [&]() {return /* queue not empty */; });  // lock is released as long as condition is false 
    //... here lock is set and condition is true: read element from queue
    //... if processing is long release the lock when queue is read
}    // otherwhise the lock is removed here

You can find a tutorial with a condition variable producer/consumer example here. Note the notify_one() vs. notify_all(); I'm currently experimenting with it, and as far as I've understood, the first is the best for producer/unspecialized consumer. The latter would be appropriate if each consumer is specialized, i.e. has to find out if he's able to process the input.

Other approach

Playing with your current code. I get the lines "Received" are displayed with from 10 to 90, showing that all threads have contributed, but then the programme hangs.

If I replace the waiting with this_thread::yield() in order to avoid unnecessary waits, all three threads immediately declare that they have completed their work. Why ? because the producer runs much faster, and when the processor gets opportunity to work, producer_finished is true. The processor then doesn't even run the loop.

So your code is heavily dependent on the order of execution. Keeping in mind that your queue is threadsafe, you could improve situation by modifying the while condition of processor and consumer as follows:

while (!producer_finished.load() || !in_queue.empty()) {
...
while (!processor_finished.load() || !queue.empty()) {
...

Appart from the conditional_variable approach above, another approach could be to use std::promise<T> (ex: for the processor to sent its notification), and std::shared_future<T> (ex: for the consumer to find out that he has something to process)