2
votes

I am trying to include condition variables and channels in my multithreaded program, and have made a basic program to try and understand how they work. In this program, one thread will add numbers 0 to 9 to the channel buffer, and the other thread will display each number and pop it from the buffer.

Currently, the program runs but nothing is displayed. I suspect threads are waiting on a resource and so have entered deadlock, but I'm not sure how to fix this.

Source.cpp (threads are called):

#include "channel.h"
#include <iostream>
channel channel1;

void function1() {
    for (int i = 0; i < 10; i++) {
        channel1.write(to_string(i));
    }
}

void function2() {
    string val;
    for (int i = 0; i < 10; i++) {
        val = channel1.read();
        cout << val << "\n";
    }
}

    void main() {
        thread t1(function1);
        thread t2(function2);
        t1.join();
    t2.join();
    return;
}

channel.h (Methods for writing to/reading from buffer):

#pragma once
#include <mutex>
#include <list>
#include <string>
using namespace std;

typedef unique_lock<mutex> mutex_lock;
class channel {
public:
    list<string> buffer;
    mutex buffer_mutex; // controls access to buffer
    condition_variable cv;

    void write(string data) {
        mutex_lock lock(buffer_mutex);
        buffer.push_back(data);
        cv.notify_all();
    }

    string read() {
        string item = "";
        while (item == "") {
            mutex_lock lock(buffer_mutex);
            cv.wait(lock);
            string item = buffer.front();
            buffer.pop_front();
            return item;
        }
    }
};

Any help much appreciated :)

2
Consider what happens when the first thread manages to stuff everything into the queue, and signal the condition variable very quickly even before the other thread reaches read(). The first thread signalled the condition variable, but nothing was waiting, but that's ok. Now, the other thread finally makes it into read(), and then starts wait()ing for the condition variable to be signalled. Where do you expect that signal to come from, now? Nothing ever will signal that condition variable, until our sun explodes. Think about it. - Sam Varshavchik
As always, a using directive at global scope in a header file is a firing offense. Just so you know. - Davis Herring

2 Answers

2
votes

See this code, I introduced a bool data_avail to make the intend clear, a time delay so that the writer don't lock all the time and the while item != "" is removed since it was deemed unnecessary.

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
#include <string>
#include <chrono>

using namespace std;

typedef unique_lock<mutex> mutex_lock;

class channel {
public:
    list<string> buffer;
    mutex buffer_mutex; // controls access to buffer
    condition_variable cv;
    bool data_avail = false;

    void write(string data) {
        mutex_lock lock(buffer_mutex);
        buffer.push_back(data);
        data_avail = true;
        cv.notify_all();
    }

    string read() {
        string item ;
        mutex_lock lock(buffer_mutex);
        cv.wait(lock,[&](){ return data_avail;});
        string item = buffer.front();
        buffer.pop_front();
        data_avail = false;
        return item;
    }
};

channel channel1;

void function1() {
    for (int i = 0; i < 10; i++) {
        channel1.write(to_string(i));
        this_thread::sleep_for(chrono::milliseconds(100));
    }
}

void function2() {
    string val;
    for (int i = 0; i < 10; i++) {
        val = channel1.read();
        cout << val << "\n";

    }
}

int main() {
    thread t1(function1);
    thread t2(function2);
    t1.join();
    t2.join();
    return 0;
}

Output:

enter image description here

0
votes

Here is an unbuffered generic channel implementation. It was derived directly from @seccpur 's answer.

#include <iostream>
#include <thread>
#include <condition_variable>

using namespace std;

typedef unique_lock<mutex> mutex_lock;

template<class T>
class channel {
    T buffer;
    mutex buffer_mutex; // controls access to buffer
    condition_variable read_cond;
    condition_variable write_cond;
    bool data_avail = false;

public:
    void write(T data) {
        mutex_lock lock(buffer_mutex);
        write_cond.wait(lock, [&]() { return !data_avail; });
        buffer = data;
        data_avail = true;
        read_cond.notify_all();
    }

    T read() {
        mutex_lock lock(buffer_mutex);
        read_cond.wait(lock, [&]() { return data_avail; });
        T item = buffer;
        data_avail = false;
        write_cond.notify_all();
        return item;
    }
};

int main() {
    const int LENGTH = 30;

    channel<int> channel1;

    auto function1 = [&](){
        for (int i = 0; i < LENGTH; ++i) {
            channel1.write(i);
        }
    };

    auto function2 = [&](){
        for (int i = 0; i < LENGTH; ++i) {
            std::cout << 2 * channel1.read() << std::endl;
        }
    };

    thread t1(function1);
    thread t2(function2);
    t1.join();
    t2.join();
    return 0;
}