1
votes

I have one producer thread and several consumers, every consumer has: own queue with data and unique id. I use std::map to identify each queue for thread.

typedef std::map<int, std::queue<Task>> TaskMap;
TaskMap inputQueue;
TaskMap outputQueue;

Each consumer thread work with data in his queue, if queue is empty, thread must wait for data. If i want to do it with only one thread, i can use std::condition_variable with std::unique_lock, but i have several consumers so i need several std::condition_variable, but i cannot save them in containers (copy/assignment are deleted). So i use code like this

while(q.empty()) {
    std::cout << "waiting...\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

Where q is reference to queue. But how can i synchronize it with better way? Thanks in advance. P.S. Queue will always has data, last data must say 'exit'.

4
Lack of copy/assignment doesn't mean you can't put things into container. Only some of the operations will be unavailable. You can still use emplace and at. - zch
Please take a look at this. - Pooja Nilangekar
"but i have several consumers so i need several std::condition_variable" not true. - Jonathan Wakely
So how can i identify thread which i need to wake? - user3365834
@user3365834 you seriuosly need a thread pool here. don't try to re-invent the wheel. - David Haim

4 Answers

0
votes

invoke OOP to this task:

class ConcurentTaskQueue{
    std::mutex lock;
    std::condition_variable m_ConditionVariable;
    std::queue<Task> m_TaskQueue;


public:
    Task getTask(){
        std::unique_lock<std::mutex> synchLock (lock); //NOTE: consider doing this with a while(programIsRunning){} loop
        while(m_TaskQueue.empty()){
            m_ConditionVariable.wait(synchLock);
        } 
        Task task(std::move(m_TaskQueue.front()));
        m_TaskQueue.pop();
        return task;
    }

    void addTask (Task task){
        std::unique_lock<std::mutex> synchLock (lock);
        m_TaskQueue.push(std::move(task));
        m_ConditionVariable.notify_one();
    }
};

and now simply:

std::map<size_t,ConcurentTaskQueue> inputQueue;
std::thread producer ([&]{
      Task task = produceTask()
      inputQueue[ID].addTask(task);
});

std::thread consumer1([&]{
      Task task = inputQueue[ID].getTask();
});


std::thread consumer2([&]{
      Task task = inputQueue[ID].getTask();
});

EDIT2: use a threadpool

2
votes

Because each consumer has its own queue and there is only one producer for all consumers, it is essentially one-producer-one-consumer scenario.

In other words, you do not have a single queue shared between all consumers.

1
votes

One std::conditional_variable with one std::mutex should be enough.

Task t;
{
  std::unique_lock<std::mutex> lock(mtx);
  while (q.empty())
    cond_var.wait(lock);
  t = std::move(q.front());
  q.pop_front();
}

and main thread will be doing

{
  std::lock_guard<std::mutex> lock(mtx);
  q.emplace_front(/*...*/);
  cond_var.notify_all();
}

Main thread will wake up all threads, but most of them will go back to sleep, since their queue is still empty.

0
votes

If you are not doing this for research / learning but for production code, I would recommend using one of the many implementations. Implementing this correctly, efficiently and scalable is not that easy, but the problems have been solved by other people. There are many options, e.g.