2
votes

what I want to do is to push integers to my threadSafe queue implementation with multiple threads and concurrently with another series of threads pop away the inserted numbers. All of this operation has to be thread safe, but another option that I want to have is that the size of the queue must be fixed, just like a buffer. If the buffer is full all the push threads must wait the pop threads to free some slot.

This is my implementation of the queue/buffer, it seems to work but after few iterations it stops and remains blocked without any error.

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>

template <typename T>

class Queue
{
private:
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable cond_;

public:

    T pop()
    {
        std::unique_lock<std::mutex> mlock(mutex_);

        cond_.wait(mlock, [this]{return !queue_.empty();});

        auto val = queue_.front();
        queue_.pop();
        return val;
    }

    void pop(T& item)
    {
        std::unique_lock<std::mutex> mlock(mutex_);

        cond_.wait(mlock, [this]{return !queue_.empty();});

        item = queue_.front();
        queue_.pop();
    }

    void push(const T& item, int buffer)
    {
        std::unique_lock<std::mutex> mlock(mutex_);

        while (queue_.size() >= buffer)
        {
            cond_.wait(mlock);
        }

        queue_.push(item);
        mlock.unlock();
        cond_.notify_one();
    }

    Queue()=default;
    Queue(const Queue&) = delete;            // disable copying
    Queue& operator=(const Queue&) = delete; // disable assignment

};

The size of the buffer is defined in the push function with the variable buffer. This is an example of usage:

 void prepare(Queue<int>& loaded, int buffer, int num_frames)
 {
     for (int i = 0; i < num_frames; i++)
     {
         cout<< "push "<<i<<endl;
         loaded.push(i, buffer);
     }
 }

 void load (vector<Frame>& movie, Queue<int>& loaded, int num_frames,
                            int num_points, int buffer, int height, int width)
    {
        for (int i = 0; i < num_frames; i++)
        {
            int num = loaded.pop();
            cout<< "pop "<<num<<endl;
    }
}

int main()
{
    srand(time(NULL));

    int num_threadsXstage = 4;

    int width = 500;
    int height = 500;

    int num_points = width * height;

    int num_frames = 100;

    int frames_thread = num_frames/num_threadsXstage;

    int preset = 3;

    int buffer = 10;

    //Vectors of threads
    vector<thread> loader;

    //Final vector
    vector<Frame> movie;
    movie.resize(num_frames);

    //Working queues
    Queue<int> loaded;

    //Prepare loading queue task
    thread preparator(prepare, ref(loaded), buffer, num_frames);

    for (int i = 0; i < num_threadsXstage; i++)
    {
        //stage 1
        loader.push_back(thread(&load, ref(movie), ref(loaded), frames_thread,
                                num_points, buffer, height, width));

    }


    // JOIN
    preparator.join();

    join_all(loader);

    return 0;
}
1
You are missing a notify() in the pop() to signal the push() it's clear to push now. Also, as pointed out already, you need to keep holding the lock while notifying.TFM

1 Answers

3
votes

Your pop functions could allow a thread waiting to push to make forward progress, but they don't call any notify function. You must call the appropriate notify function any time you may make it possible for a thread blocked on the condition variable to make forward progress.

Although it's quite complex to explain why, you should either call notify_all or call notify_one while still holding the lock. It is theoretically possible to "wake the wrong thread" otherwise because you are using the same condition variable for two predicates (the queue is not empty and the queue is not full).

To avoid very hard to understand failure modes, always do one of these three things:

  1. Do not use the same condition variable to handle more than one predicate. For example, use one condition variable for "not empty" and another for "not full";
  2. Always use notify_all, never notify_one; or
  3. Always call notify functions while holding the mutex.

So long as you follow at least one of these three rules, you will avoid an obscure failure mode where you wake only a thread that chose to sleep after you released the mutex while leaving the only thread that could handle the condition still blocked.