146
votes

Is it true that C++0x will come without semaphores? There are already some questions on Stack Overflow regarding the use of semaphores. I use them (posix semaphores) all the time to let a thread wait for some event in another thread:

void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

If I would do that with a mutex:

void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

Problem: It's ugly and it's not guaranteed that thread1 locks the mutex first (Given that the same thread should lock and unlock a mutex, you also can't lock event1 before thread0 and thread1 started).

So since boost doesn't have semaphores either, what is the simplest way to achieve the above?

11
Maybe use the condition mutex and std::promise and std::future?Yves

11 Answers

199
votes

You can easily build one from a mutex and a condition variable:

#include <mutex>
#include <condition_variable>

class semaphore {
    std::mutex mutex_;
    std::condition_variable condition_;
    unsigned long count_ = 0; // Initialized as locked.

public:
    void release() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void acquire() {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        while(!count_) // Handle spurious wake-ups.
            condition_.wait(lock);
        --count_;
    }

    bool try_acquire() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        if(count_) {
            --count_;
            return true;
        }
        return false;
    }
};
110
votes

Based on Maxim Yegorushkin's answer, I tried to make the example in C++11 style.

#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};
38
votes

I decided to write the most robust/generic C++11 semaphore I could, in the style of the standard as much as I could (note using semaphore = ..., you normally would just use the name semaphore similar to normally using string not basic_string):

template <typename Mutex, typename CondVar>
class basic_semaphore {
public:
    using native_handle_type = typename CondVar::native_handle_type;

    explicit basic_semaphore(size_t count = 0);
    basic_semaphore(const basic_semaphore&) = delete;
    basic_semaphore(basic_semaphore&&) = delete;
    basic_semaphore& operator=(const basic_semaphore&) = delete;
    basic_semaphore& operator=(basic_semaphore&&) = delete;

    void notify();
    void wait();
    bool try_wait();
    template<class Rep, class Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& d);
    template<class Clock, class Duration>
    bool wait_until(const std::chrono::time_point<Clock, Duration>& t);

    native_handle_type native_handle();

private:
    Mutex   mMutex;
    CondVar mCv;
    size_t  mCount;
};

using semaphore = basic_semaphore<std::mutex, std::condition_variable>;

template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
    : mCount{count}
{}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify() {
    std::lock_guard<Mutex> lock{mMutex};
    ++mCount;
    mCv.notify_one();
}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait() {
    std::unique_lock<Mutex> lock{mMutex};
    mCv.wait(lock, [&]{ return mCount > 0; });
    --mCount;
}

template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait() {
    std::lock_guard<Mutex> lock{mMutex};

    if (mCount > 0) {
        --mCount;
        return true;
    }

    return false;
}

template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() {
    return mCv.native_handle();
}
15
votes

in acordance with posix semaphores, I would add

class semaphore
{
    ...
    bool trywait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        if(count_)
        {
            --count_;
            return true;
        }
        else
        {
            return false;
        }
    }
};

And I much prefer using a synchronisation mechanism at a convenient level of abstraction, rather than always copy pasting a stitched-together version using more basic operators.

11
votes

C++20 finally has semaphores - std::counting_semaphore<max_count>.

These have (at least) the following methods:

  • acquire() (blocking)
  • try_acquire() (non-blocking, returns immediately)
  • try_acquire_for() (non-blocking, takes a duration)
  • try_acquire_until() (non-blocking, takes a time at which to stop trying)
  • release()

You can read these CppCon 2019 presentation slides, or watch the video. There's also the official proposal P0514R4, but it may not be up-to-date with actual C++20.

9
votes

You can also check out cpp11-on-multicore - it has a portable and optimal semaphore implementation.

The repository also contains other threading goodies that complement c++11 threading.

8
votes

You can work with mutex and condition variables. You gain exclusive access with the mutex, check whether you want to continue or need to wait for the other end. If you need to wait, you wait in a condition. When the other thread determines that you can continue, it signals the condition.

There is a short example in the boost::thread library that you can most probably just copy (the C++0x and boost thread libs are very similar).

3
votes

Also can be useful RAII semaphore wrapper in threads:

class ScopedSemaphore
{
public:
    explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
    ScopedSemaphore(const ScopedSemaphore&) = delete;
    ~ScopedSemaphore() { m_Semaphore.Notify(); }

   ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;

private:
    Semaphore& m_Semaphore;
};

Usage example in multithread app:

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;

for (...)
{
    ...
    auto t = new std::thread([..., &semaphore]
    {
        ScopedSemaphore scopedSemaphore(semaphore);
        ...
    }
    );
    threads.push_back(t);
}

for (auto& t : threads)
    t.join();
2
votes

I found the shared_ptr and weak_ptr, a long with a list, did the job I needed. My issue was, I had several clients wanting to interact with a host's internal data. Typically, the host updates the data on it's own, however, if a client requests it, the host needs to stop updating until no clients are accessing the host data. At the same time, a client could ask for exclusive access, so that no other clients, nor the host, could modify that host data.

How I did this was, I created a struct:

struct UpdateLock
{
    typedef std::shared_ptr< UpdateLock > ptr;
};

Each client would have a member of such:

UpdateLock::ptr m_myLock;

Then the host would have a weak_ptr member for exclusivity, and a list of weak_ptrs for non-exclusive locks:

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

There is a function to enable locking, and another function to check if the host is locked:

UpdateLock::ptr LockUpdate( bool exclusive );       
bool IsUpdateLocked( bool exclusive ) const;

I test for locks in LockUpdate, IsUpdateLocked, and periodically in the host's Update routine. Testing for a lock is as simple as checking if the weak_ptr's expired, and removing any expired from the m_locks list (I only do this during the host update), I can check if the list is empty; at the same time, I get automatic unlocking when a client resets the shared_ptr they are hanging onto, which also happens when a client gets destroyed automatically.

The over all effect is, since clients rarely need exclusivity (typically reserved for additions and deletions only), most of the time a request to LockUpdate( false ), that is to say non-exclusive, succeeds so long as (! m_exclusiveLock). And a LockUpdate( true ), a request for exclusivity, succeeds only when both (! m_exclusiveLock) and (m_locks.empty()).

A queue could be added to mitigate between exclusive and non-exclusive locks, however, I have had no collisions thus far, so I intend to wait until that happens to add the solution (mostly so I have a real-world test condition).

So far this is working well for my needs; I can imagine the need to expand this, and some issues that might arise over expanded use, however, this was quick to implement, and required very little custom code.

-2
votes

Different from other answers, I propose a new version which:

  1. Unblocks all waiting threads before being deleted. In this case, deleting the semaphore will wake up all waiting threads and only after everybody wakes up, the semaphore destructor will exit.
  2. Has a parameter to the wait() call, to automatically unlock the calling thread after the timeout in milliseconds has passed.
  3. Has an options on the construtor to limit available resources count only up to the count the semaphore was initialized with. This way, calling notify() too many times will not increase how many resources the semaphore has.
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>

std::recursive_mutex g_sync_mutex;
#define sync(x) do { \
        std::unique_lock<std::recursive_mutex> lock(g_sync_mutex); \
        x; \
    } while (false);

class Semaphore {
    int _count;
    bool _limit;
    int _all_resources;
    int _wakedup;
    std::mutex _mutex;
    std::condition_variable_any _condition_variable;

public:
    /**
     * count - how many resources this semaphore holds
     * limit - limit notify() calls only up to the count value (available resources)
     */
    Semaphore (int count, bool limit)
        : _count(count),
        _limit(limit),
        _all_resources(count),
        _wakedup(count)
    {
    }

    /**
     * Unlock all waiting threads before destructing the semaphore (to avoid their segfalt later)
     */
    virtual ~Semaphore () {
        std::unique_lock<std::mutex> lock(_mutex);
        _wakeup(lock);
    }

    void _wakeup(std::unique_lock<std::mutex>& lock) {
        int lastwakeup = 0;

        while( _wakedup < _all_resources ) {
            lock.unlock();
            notify();
            lock.lock();
            // avoids 100% CPU usage if someone is not waking up properly
            if (lastwakeup == _wakedup) {
                std::this_thread::sleep_for( std::chrono::milliseconds(10) );
            }
            lastwakeup = _wakedup;
        }
    }

    // Mutex and condition variables are not movable and there is no need for smart pointers yet
    Semaphore(const Semaphore&) = delete;
    Semaphore& operator =(const Semaphore&) = delete;
    Semaphore(const Semaphore&&) = delete;
    Semaphore& operator =(const Semaphore&&) = delete;

    /**
     * Release one acquired resource.
     */
    void notify()
    {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling notify(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        _count++;
        if (_limit && _count > _all_resources) {
            _count = _all_resources;
        }
        _condition_variable.notify_one();
    }

    /**
     * This function never blocks!
     * Return false if it would block when acquiring the lock. Otherwise acquires the lock and return true.
     */
    bool try_acquire() {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling try_acquire(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        if(_count <= 0) {
            return false;
        }
        _count--;
        return true;
    }

    /**
     * Return true if the timeout expired, otherwise return false.
     * timeout - how many milliseconds to wait before automatically unlocking the wait() call.
     */
    bool wait(int timeout = 0) {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling wait(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        _count--;
        _wakedup--;
        try {
            std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();

            while(_count < 0) {
                if (timeout < 1) {
                    _condition_variable.wait(lock);
                }
                else {
                    std::cv_status status = _condition_variable.wait_until(lock, timenow + std::chrono::milliseconds(timeout));

                    if ( std::cv_status::timeout == status) {
                        _count++;
                        _wakedup++;
                        return true;
                    }
                }
            }
        }
        catch (...) {
            _count++;
            _wakedup++;
            throw;
        }
        _wakedup++;
        return false;
    }

    /**
     * Return true if calling wait() will block the calling thread
     */
    bool locked() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count <= 0;
    }

    /**
     * Return true the semaphore has at least all resources available (since when it was created)
     */
    bool freed() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count >= _all_resources;
    }

    /**
     * Return how many resources are available:
     * - 0 means not free resources and calling wait() will block te calling thread
     * - a negative value means there are several threads being blocked
     * - a positive value means there are no threads waiting
     */
    int count() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count;
    }

    /**
     * Wake everybody who is waiting and reset the semaphore to its initial value.
     */
    void reset() {
        std::unique_lock<std::mutex> lock(_mutex);
        if(_count < 0) {
            _wakeup(lock);
        }
        _count = _all_resources;
    }
};

Utility to print the current timestamp:

std::string getTime() {
    char buffer[20];
#if defined( WIN32 )
    SYSTEMTIME wlocaltime;
    GetLocalTime(&wlocaltime);
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
    std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
    auto duration = now.time_since_epoch();
    auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
    duration -= hours;
    auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
    duration -= minutes;
    auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
    duration -= seconds;
    auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
    duration -= milliseconds;
    time_t theTime = time( NULL );
    struct tm* aTime = localtime( &theTime );
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
    return buffer;
}

Example program using this semaphore:

// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
    std::cerr << getTime() << "Creating Semaphore" << std::endl;
    Semaphore* semaphore = new Semaphore(1, false);
    semaphore->wait(1000);
    semaphore->wait(1000);
    std::cerr << getTime() << "Auto Unlocking Semaphore wait" << std::endl;

    std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
    delete semaphore;

    std::cerr << getTime() << "Exiting after 10 seconds..." << std::endl;
    return 0;
}

Example output:

11:03:01.012 Creating Semaphore
11:03:02.012 Auto Unlocking Semaphore wait
11:03:07.012 Exiting after 10 seconds...

Extra function which uses a EventLoop to unlock the semaphores after some time:

std::shared_ptr<std::atomic<bool>> autowait(Semaphore* semaphore, int timeout, EventLoop<std::function<void()>>& eventloop, const char* source) {
    std::shared_ptr<std::atomic<bool>> waiting(std::make_shared<std::atomic<bool>>(true));
    sync(std::cerr << getTime() << "autowait '" << source << "'..." << std::endl);

    if (semaphore->try_acquire()) {
        eventloop.enqueue( timeout, [waiting, source, semaphore]{
            if ( (*waiting).load() ) {
                sync(std::cerr << getTime() << "Timeout '" << source << "'..." << std::endl);
                semaphore->notify();
            }
        } );
    }
    else {
        semaphore->wait(timeout);
    }
    return waiting;
}

Semaphore semaphore(1, false);
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);
std::shared_ptr<std::atomic<bool>> waiting_something = autowait(&semaphore, 45000, eventloop, "waiting_something");
-4
votes

In case someone is interested in the atomic version, here is the implementation. The performance is expected better than the mutex & condition variable version.

class semaphore_atomic
{
public:
    void notify() {
        count_.fetch_add(1, std::memory_order_release);
    }

    void wait() {
        while (true) {
            int count = count_.load(std::memory_order_relaxed);
            if (count > 0) {
                if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    break;
                }
            }
        }
    }

    bool try_wait() {
        int count = count_.load(std::memory_order_relaxed);
        if (count > 0) {
            if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                return true;
            }
        }
        return false;
    }
private:
    std::atomic_int count_{0};
};