0
votes

In a very simple consumer/producer test, my consumer always gets an empty queue. And I can't figure out where I'm wrong. It's a very simple test program so I hope someone with a sharp eye could give me some help here.

#include <string>
#include <cstdio>
#include <cstdlib>     /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>

std::deque<std::string> gMsgs;
std::mutex gMutex;
std::thread gThread;

void updateLog() {
    std::lock_guard<std::mutex> lock(gMutex);
    char msg[256];
    int rnd = rand()%100 + 1;
    sprintf(msg, "hello: %d", rnd);
    gMsgs.push_back(std::string(msg));
}

void produce() {
    srand (time(NULL));

    // produce every 5ms

    timespec ts = {0, 5*1000000};

    //
    gThread = std::thread([&]() {
        while(true) {
            updateLog();
            nanosleep(&ts, NULL);
        }
    });
    printf("log thread created.");
}

void consume() {

    // consume every 10ms

    timespec ts = {0, 10*1000000};

    //

    while (true) {
        std::lock_guard<std::mutex> lock(gMutex);
        std::string log;
        unsigned int N = gMsgs.size();

        // consume all data in queue at the moment

        for (unsigned int m = 0; m < N; ++m) {
            log += gMsgs[m]+"\n";
        }

        // remove already consumed data

        for(unsigned int m=0; m<N; ++m) {
            gMsgs.pop_front();
        }

        if (log.empty()) {
            log = "EMPTY";
        }

        printf("log: %s\n", log.c_str());
        nanosleep(&ts, NULL);
    }
}

int main()
{
  produce();
  consume();
}


So I have my producer run in a background thread, and keep pushing new string to the queue, at a faster pace. my consumer in main thread keeps taking data out of the queue, at a slower rate.

Expectation

I expect that the queue shouldn't be empty and the consumer should always have something to eat at any moment.

Observed

The print out is always "EMPTY", meaning there is nothing in queue.

What's wrong?

UPDATE

I took the advice from @john and moved nanosleep out of lock guard, but the result is pretty much the same, see the updated code.

// Example program

#include <string>
#include <cstdio>
#include <cstdlib>     /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>

std::deque<std::string> gMsgs;
std::mutex gMutex;
std::thread gThread;

void updateLog() {
    std::lock_guard<std::mutex> lock(gMutex);
    char msg[256];
    int rnd = rand()%100 + 1;
    sprintf(msg, "hello: %d", rnd);
    gMsgs.push_back(std::string(msg));
}

void produce() {
    srand (time(NULL));

    int rnd = -1;
    timespec ts = {0, 5*1000000};
    gThread = std::thread([&]() {
        while(true) {
            updateLog();
            nanosleep(&ts, NULL);
        }
    });
    printf("log thread created.\n");
}

void do_consume() {
    std::lock_guard<std::mutex> lock(gMutex);
    std::string log;
    unsigned int N = gMsgs.size();
    for (unsigned int m = 0; m < N; ++m) {
        log += gMsgs[m]+"\n";
    }
    for(unsigned int m=0; m<N; ++m) {
        gMsgs.pop_front();
    }

    if (log.empty()) {
        log = "EMPTY";
    }
    printf("log: %s\n", log.c_str());
}

void consume() {
    timespec ts = {0, 10*1000000};
    while (true) {
        do_consume();
        nanosleep(&ts, NULL);
    }
}

int main()
{
  produce();
  consume();
}

and the result

log thread created.
log: EMPTY
log: hello: 2

log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
1
Is the producer thread pushing anything in the queue? Try adding print statements at certain points for debugging. Instead of using two loops for processing the deque, you can use one loop only. while (! gMsgs.empty()) { std::string s = gMsgs.front(); gMsgs.pop_front(); } - kiner_shah
Usually a producer consumer has some method of notifying the consumer of the availability of data. I'm not seeing that here, so the consumer spins around and around, often not finding anything in the queue. - user4581301
I think the nanosleep in the consumer thread should be outside the scope of the lock guard. - john
Instead of using std::string for log use std::stringstream. - kiner_shah
@john, I think nanosleep is not required at all in the consumer thread. If he can have an infinite while loop inside which it checks if deque is not empty, then only do processing. This way consumer will know when deque has some item and will process it immediately. - kiner_shah

1 Answers

0
votes

Just wanna echo what @user4581301 and @PaulMcKenzie suggested, indeed std::contional_variable is the way to go. And I removed the nanosleep() calls.

The updated code:

// Example program
#include <condition_variable>
#include <string>
#include <cstdio>
#include <cstdlib>     /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>

std::deque<std::string> gMsgs;
std::mutex gMutex;
std::condition_variable gNotify;
bool gConsumerCanConsume = false;
std::thread gThread;

void updateLog() {
    std::unique_lock<std::mutex> lock(gMutex);
    char msg[256];
    int rnd = rand()%100 + 1;
    sprintf(msg, "hello: %d", rnd);
    gMsgs.push_back(std::string(msg));
    gConsumerCanConsume = true;
    lock.unlock();
    gNotify.notify_one();
}

void produce() {
    srand (time(NULL));

    timespec ts = {0, 5*1000000};
    gThread = std::thread([&]() {
        while(true) {
            updateLog();
        }
    });
    printf("log thread created.\n");
}

void do_consume() {
    {
        std::unique_lock<std::mutex> lock(gMutex);
        gNotify.wait(lock, []{return gConsumerCanConsume;});
    }

    std::string log;
    unsigned int N = gMsgs.size();
    for (unsigned int m = 0; m < N; ++m) {
        log += gMsgs[m]+"\n";
    }
    for(unsigned int m=0; m<N; ++m) {
        gMsgs.pop_front();
    }
    if (log.empty()) {
        log = "EMPTY";
    }
    printf("log: %s\n", log.c_str());
}

void consume() {
    timespec ts = {0, 10*1000000};
    while (true) {
        do_consume();
    }
}

int main()
{
  produce();
  consume();
}

and the result


log thread created.
log: hello: 56
hello: 30
hello: 58
hello: 16
hello: 73
hello: 33
hello: 92
hello: 20
hello: 80
hello: 34
hello: 80
hello: 54
hello: 68
hello: 20
hello: 86
hello: 67

...