0
votes

I have a thread (producer) that acquires data from some source every t mSec. Once the data is acquired and ready, other threads (consumers) should get the data and do some processing on it.

However, There is no guarantees which thread is faster (the producer may be slower or faster than the consumers.

What I did:

//Producer
while(is_enabled_) {
    std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
    std::unique_lock<std::mutex> lk(mutex_);
    ready_ = false;
    //acquiring the data
    ready_ = true;
    lk.unlock();
    cv_.notify_all();
    std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_milliseconds < duration ? 0 : sleep_milliseconds - duration));
}

Considering the consumers all do the same:

//A consumer
while(is_enabled_){
    std::unique_lock<std::mutex> lk(mutex_);
    cv_.wait(lk, [this] {return this->ready_; });
    //Process the data
}

I do not have queues. Only the last acquired data should be processed and only once by each consumer. If some data was acquired and no consumer found time to process it, the data is dropped off and another data overwrites it by the producer.

On the other hand, if the consumers were faster than the producer, they should wait until a new data is ready instead of processing the old one.

The problem I am facing that the consumers are using the same old data produced by the producer if the producer was not quick enough to produce new data.

What does my implementation lack?

1
Have a global shared_pointer pointing to the most recent data packet. Producer would prepare a new packet and atomic_store it into the global. Consumer would atomic_load it (or atomic_exchange with null pointer, if only one consumer should get it).Igor Tandetnik
What, if any, is the relationship between mutex_ and point_cloud_mutex_, and between cv_ and point_cloud_cv_? Why is consumer waiting for point_cloud_ready_ to become true, when no one is setting it?Igor Tandetnik
What is the error you are receiving?Jake Freeman
@IgorTandetnik Sorry my bad, it is a typoHumam Helfawi
You leave ready_ == true even after the data is consumed. So the next wait call will be satisfied right away, and see the same data.Igor Tandetnik

1 Answers

0
votes

You could due something like this to achieve your goals:

Global variable: std::vector<bool> newData;

For the producers:

while(is_enabled_) {
    std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
    std::unique_lock<std::mutex> lk(mutex_);
    ready_ = false;
    //acquiring the data
    ready_ = true;
   std::fill(newData.begin(), newData.end(), true);
    lk.unlock();
    cv_.notify_all();
    std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_milliseconds < duration ? 0 : sleep_milliseconds - duration));
}

In the consumers before the while loop do:

int index;
std::unique_lock<std::mutex> lk(mutex_);
index = newData.size();
newData.push_back(false);
lk.unlock();

Then the body is like this

while(is_enabled_){
    std::unique_lock<std::mutex> lk(mutex_);
    if(newData[index]) {
        cv_.wait(lk, [this] {return this->ready_; });
        newData[index] = false;
    //Process the data
    }else  {
          lk.unlock();
          std::this_thread::sleep_for(std::chrono::milliseconds(50);
    }
}

Hope this helps.