I would like to implement a slight modification of the usual producer-consumer problem, with an intermediate 'worker' thread(s), with a limited resource. An example application could be:
- A producer thread reads records from a file and puts them in a queue. Once the end of file is reached, a notification should be sent to the worker thread(s).
- One or more 'worker' threads pulls records from the producer queue, does some sort of processing, and pushes the processed records to another queue. Once all records have been processed a notification is sent to the consumer thread.
- A single consumer thread writes the processed records to a file.
I'm not saying this is a good method to solve such a problem, but it highlights the issue I'm trying to tackle, namely how to notify the worker and consumer threads correctly.
I have a thread safe queue with the following interface:
template<class T>
class threadsafe_queue
{
public:
threadsafe_queue() {}
threadsafe_queue(const threadsafe_queue& other);
void push(T new_value);
void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();
bool try_pop(T& value);
std::shared_ptr<T> try_pop();
bool empty() const;
};
My first idea to solve the problem with a single worker thread was to use two atomic bools, as follows:
#include <chrono>
#include <thread>
void queue_producer(threadsafe_queue<unsigned>& queue, std::atomic<bool>& producer_finished)
{
for (unsigned i = 0; i < 10; ++i) {
queue.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
producer_finished.store(true);
std::cout << "Producer finished." << std::endl;
}
void queue_processor(threadsafe_queue<unsigned>& in_queue, threadsafe_queue<unsigned>& out_queue,
std::atomic<bool>& producer_finished, std::atomic<bool>& processor_finished)
{
unsigned value;
while (!producer_finished.load()) {
in_queue.wait_and_pop(value);
value *= 10;
out_queue.push(value);
}
processor_finished.store(true);
std::cout << "Processor finished." << std::endl;
}
void queue_consumer(threadsafe_queue<unsigned>& queue, std::atomic<bool>& processor_finished)
{
unsigned value;
while (!processor_finished.load()) {
queue.wait_and_pop(value);
std::cout << "Received value " << value << "." << std::endl; // Or write to file etc.
}
std::cout << "Consumer finished." << std::endl;
}
int main(int argc, const char * argv[])
{
std::atomic<bool> producer_finished(false);
std::atomic<bool> processor_finished(false);
threadsafe_queue<unsigned> in_queue;
threadsafe_queue<unsigned> out_queue;
std::thread producer_thread(queue_producer, std::ref(in_queue), std::ref(producer_finished));
std::thread processor_thread(queue_processor, std::ref(in_queue), std::ref(out_queue), std::ref(producer_finished), std::ref(processor_finished));
std::thread consumer_thread(queue_consumer, std::ref(out_queue), std::ref(processor_finished));
producer_thread.join();
processor_thread.join();
consumer_thread.join();
return 0;
}
The problem with this is that the processor (and consumer) can re-enter the while loop before the atomic bool is set, and hence indefinitely wait for a record that will never come.
I also thought a solution could be to have some sort of sentinel value that is pushed onto the queue to signify the end (could use a wrapper class), but this doesn't seem a particularly nice way to do things, and it wouldn't work for the multi-worker version. I'm actually thinking that the multi-worker version is a much more difficult problem, so any help with the single worker version would be a great start.
<condition_variable>? - Christophe