4
votes

I have a use case where a fast producer inserts data in a queue and a slow consumer consumes data. The problem I'm facing is continuous increase in queue size over time. I have a class implementation where a std::queue is protected by mutex and condition variable for concurrent reads and writes.

How can this adapted to case where the producer after reaching a MAX_THRESHOLD till stop inserting data into the queue and the consumer has consumed some amount of data signals the producer to insert data into the queue.

Could someone provide a sample implementation ?

Also with out changing the class implementation is it possible to solve this problem by adding another layer of syncronization in the producer and consumer ?

2

2 Answers

4
votes

Either:

a) Use a bounded queue class that blocks the producer if the queue size reaches MAX_THRESHOLD. This means changing the queue class, which you may not want.

b) Use a 'pool queue' - another unbounded blocking queue that you fill up with MAX_THRESHOLD objects at start up. The producer gets its objects from the pool, loads them, queues to producer. Producer gets objects from consumer, 'consumes' them and returns them to the pool. This kinda mandates using pointers or maybe references, which you may not want.

c) Use a semaphore, initialized with a MAX_THRESHOLD count, to represent message-tokens in a similar way to (b) - producer has to get a unit before queueing up, consumer posts a unit when it's done with a message object.

I tend to use (b).

3
votes

Code snippet of "bounded queue" with pthread :

#include <queue>
#include <pthread.h>

template <class T, size_t UpperLimit>
class BoundedQueue {
  std::queue<T> q_;
  pthread_mutex_t mtx_;
  pthread_cond_t cv_not_empry_;
  pthread_cond_t cv_not_full_;

  // lock/unlock helper
  struct auto_locker {
    auto_locker(pthread_mutex_t* pm) : pm_(pm)
      { pthread_mutex_lock(pm_); }
    ~auto_locker()
      { pthread_mutex_unlock(pm_);}
    pthread_mutex_t *pm_;
  };

public:
  BoundedQueue() { /* initialize member... */ }
  ~BoundedQueue() { /* uninitialize member...*/ }
  // for Producer
  void push(T x) {
    auto_locker lk(&mtx_);
    while (UpperLimit <= q_.size()) {
      pthread_cond_wait(&cv_not_full_, &mtx_);
    }
    q_.push(x);
    pthread_cond_broadcast(&cv_not_empry_);
    return ret;
  }
  // for Consumer
  T pop() {
    auto_locker lk(&mtx_);
    while (q_.empty()) {
      pthread_cond_wait(&cv_not_empry_, &mtx_);
    }
    T ret = q_.front();
    q_.pop();
    pthread_cond_broadcast(&cv_not_full_);
    return ret;
  }
}