0
votes

I want to synchronize the output of two sensors that works at different frame rate (~80ms vs ~40ms) in C++ using threads. The idea is like the producer-consumer problem but with 2 producers and 1 consumer, and without a buffer because only the last new products matters.

These are the points that shoud cover the problem:

  1. Each sensor reading will be managed by a thread separately.
  2. There will be a main thread that must take always the last new two data read from the sensors and process it.
  3. The reading of one sensor should not block the reading of the other. I mean, the threads reading should not have the same mutex.
  4. The main/process thread should not block the reading threads while it is working. I propose lock the data, make a local copy (it is faster than process directly), unlock and process the copy.
  5. If there is no new data, the main thread should wait for it.

This is a time diagram of the requested functionality.

And this is the pseudocode:

void getSensor1(Data& data)
{
    while (true)
    {
        mutex1.lock();
        //Read data from sensor 1
        mutex1.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(80 + (rand() % 5)));
    }
}

void getSensor2(Data& data)
{
    while (true)
    {
        mutex2.lock();
        //Read data from sensor 2
        mutex2.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(40 + (rand() % 5)));
    }
}

int main()
{
    Data sensor1;
    Data sensor2;

    std::thread threadGetScan(getSensor1, std::ref(sensor1));
    std::thread threadGetFrame(getSensor2, std::ref(sensor2));

    while(true)
    {
        // Wait for new data, lock, copy, unlock and process it
        std::this_thread::sleep_for(std::chrono::milliseconds(100 + (rand() % 25)))
    }

    return 0;
}

Thanks in advance.

2
What's should be the effect if ~80 = 79 and ~40 = 41 in terms of synchronization? Should the slowest dictate the data report rate? - Ted Lyngmo
Re, "I want to synchronize..." and "reading...one sensor should not block...the other." Those sound like incompatible ideas. If reading one does not block the other, that means you are reading them asynchronously. If I wanted to write code that did what you said, I would consider using a single loop that repeats every 80ms, and that reads the one sensor once every trip around the loop, and reads the other twice, 40ms apart. - Solomon Slow
Maybe a promise or future might help here.. Not too sure. - Brandon
When actually looking at the code: Use synchronizing entities, not random sleeps. And on the topic of randomization, don't use rand(), use <random>. - Ted Lyngmo
And if you don't need statistical data, but only the most current, the buffer need only to be one sample per sensor. Have a thread per sensor if you will. Read the sensor and atomically update its variable. If the sensors are to be reported as a package, read all three atomically and report the state. This readers frequency can be set independently of the individual sensors reporting accuracy, but there will be drifting - Ted Lyngmo

2 Answers

1
votes

Since each sensor is only read from one thread, then mutex around the sensor access serves no purpose. You can get rid of that. Where you need thread safety is the means by which the thread which has read from a sensor passes data to the thread which is consuming it.

Have the thread reading from the sensor use only local variables, or variables only accessed by that thread, for its work of reading the sensor. Once it has the data completely, then put that data (or better yet, a pointer to the data) into a shared queue that the consuming thread will get it from.

Since you need to save only the latest data, your queue can have a max size of 1. Which can just be a pointer.

Access to this shared data structure should be protected with a mutex. But since it is just a single pointer, you can use std::atomic.

The reading thread could look like this:

void getData(std::atomic<Data*>& dataptr) {
    while (true) {
        Data* mydata = new Data;  // local variable!
        // stuff to put data into mydata
        std::this_thread::sleep_for(80ms);
        // Important! this line is only once that uses dataptr.  It is atomic.
        Data* olddata = std::atomic_exchange(&dataptr, mydata);
        // In case the old data was never consumed, don't leak it.
        if (olddata) delete olddata;
    }
}

And the main thread could look like this:

void main_thread(void) {
    std::atomic<Data*> sensorData1;
    std::atomic<Data*> sensorData2;

    std::thread sensorThread1(getData, std::ref(sensorData1));
    std::thread sensorThread2(getData, std::ref(sensorData2)); 

    while (true) {
       std::this_thread::sleep_for(100ms);
       Data* data1 = std::atomic_exchange(&sensorData1, (Data*)nullptr);
       Data* data2 = std::atomic_exchange(&sensorData2, (Data*)nullptr);
       // Use data1 and data2
       delete data1;
       delete data2;
    }
}
0
votes

After some researching work, I have found a solution that does what I wanted using mutexes and condition variables. I let you below the code I propose. Improvements and other suitable solutions are still accepted.

#include <iostream> 
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <cstdlib>

#define SIZE_LOOP 1000

// Struct where the data sensors is synchronized
struct Data
{
    int data1; // Data of sensor 1
    int data2; // Data of sensor 2
};

std::mutex mtx1;              // Mutex to access sensor1 shared data
std::mutex mtx2;              // Mutex to access sensor2 shared data
std::condition_variable cv1;  // Condition variable to wait for sensor1 data availability
std::condition_variable cv2;  // Condition variable to wait for sensor2 data availability
bool ready1;                  // Flag to indicate sensor1 data is available
bool ready2;                  // Flag to indicate sensor2 is available

// Function that continuously reads data from sensor 1
void getSensor1(int& data1)
{
    // Initialize flag to data not ready
    ready1 = false;

    // Initial delay
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));

    // Reading loop (i represents an incoming new data)
    for(int i = 0; i < SIZE_LOOP; i++)
    {
        // Lock data access
        std::unique_lock<std::mutex> lck1(mtx1);
        // Read data
        data1 = i;
        std::cout << "Sensor1 (" << data1 << ")"<< std::endl;
        // Set data to ready
        ready1 = true;
        // Notify if processing thread is waiting
        cv1.notify_one();
        // Unlock data access
        lck1.unlock();
        // Sleep to simulate frame rate
        std::this_thread::sleep_for(std::chrono::milliseconds(2000 + (rand() % 500)));
    }
}

// Function that continuously reads data from sensor 2
void getSensor2(int& data2)
{
    // Initialize flag to data not ready
    ready2 = false;

    // Initial delay
    std::this_thread::sleep_for(std::chrono::milliseconds(3000));

    // Reading loop (i represents an incoming new data)
    for(int i = 0; i < SIZE_LOOP; i++)
    {
        // Lock data access
        std::unique_lock<std::mutex> lck2(mtx2);
        // Read data
        data2 = i;
        std::cout << "Sensor2 (" << data2 << ")"<< std::endl;
        // Set data to ready
        ready2 = true;
        // Notify if processing thread is waiting
        cv2.notify_one();
        // Unlock data access
        lck2.unlock();
        // Sleep to simulate frame rate
        std::this_thread::sleep_for(std::chrono::milliseconds(1000 + (rand() % 500)));
    }
}

// Function that waits until sensor 1 data is ready
void waitSensor1(const int& dataRead1, int& dataProc1)
{
    // Lock data access
    std::unique_lock<std::mutex> lck1(mtx1);
    // Wait for new data
    while(!ready1)
    {
        //std::cout << "Waiting sensor1" << std::endl;
        cv1.wait(lck1);
    }
    //std::cout << "No Waiting sensor1" << std::endl;
    // Make a local copy of the data (allows uncoupling read and processing tasks what means them can be done parallely)
    dataProc1 = dataRead1;
    std::cout << "Copying sensor1 (" << dataProc1 << ")"<< std::endl;
    // Sleep to simulate copying load
    std::this_thread::sleep_for(std::chrono::milliseconds(200));
    // Set data flag to not ready
    ready1 = false;
    // Unlock data access
    lck1.unlock();
}

// Function that waits until sensor 2 data is ready
void waitSensor2(const int& dataRead2, int& dataProc2)
{
    // Lock data access
    std::unique_lock<std::mutex> lck2(mtx2);
    // Wait for new data
    while(!ready2)
    {
        //std::cout << "Waiting sensor2" << std::endl;
        cv2.wait(lck2);
    }
    //std::cout << "No Waiting sensor2" << std::endl;
    // Make a local copy of the data (allows uncoupling read and processing tasks what means them can be done parallely)
    dataProc2 = dataRead2;
    std::cout << "Copying sensor2 (" << dataProc2 << ")"<< std::endl;
    // Sleep to simulate copying load
    std::this_thread::sleep_for(std::chrono::milliseconds(400));
    // Set data flag to not ready
    ready2 = false;
    // Unlock data access
    lck2.unlock();
}

// Main function
int main()
{
    Data dataRead;  // Data read
    Data dataProc;  // Data to process

    // Threads that reads at some frame rate data from sensor 1 and 2
    std::thread threadGetSensor1(getSensor1, std::ref(dataRead.data1));
    std::thread threadGetSensor2(getSensor2, std::ref(dataRead.data2));

    // Processing loop
    for(int i = 0; i < SIZE_LOOP; i++)
    {
        // Wait until data from sensor 1 and 2 is ready
        std::thread threadWaitSensor1(waitSensor1, std::ref(dataRead.data1), std::ref(dataProc.data1));
        std::thread threadWaitSensor2(waitSensor2, std::ref(dataRead.data2), std::ref(dataProc.data2));

        // Shyncronize data/threads
        threadWaitSensor1.join();
        threadWaitSensor2.join();

        // Process synchronized data while sensors are throwing new data
        std::cout << "Init processing (" << dataProc.data1 << "," << dataProc.data2 << ")"<< std::endl;
        // Sleep to simulate processing load
        std::this_thread::sleep_for(std::chrono::milliseconds(10000 + (rand() % 1000)));
        std::cout << "End processing" << std::endl;
    }

    return 0;
}