2
votes

I am working a thread pool with work stealing capabilities, but whenever the program tries to lock the mutex in the work queue, I get an exception error.

I've tried the program on both Windows Visual Studio 2015 and Ubuntu 14.04 and both produce a run time exception.

I've tested the work queue extensively by itself and can't reproduce the error. If I comment out the try_steal function, I do not run into any errors. Lastly I've replace std::mutex with std::recursive_mutex and I still get the same error.

I thought the exception occurred during the deconstruction of the thread pool, namely one thread trying to read the work queue of another thread that has been destroyed already. But even after the introduction of an infinite loop before the end of the program, the same exception occurred.

I was wondering if there was something else I was not thinking to check, below you will find the relevant code and the VS 2015 and Linux call stacks.

Thanks for all your help.

Windows Call stack:

msvcp140d.dll!mtx_do_lock(_Mtx_internal_imp_t * mtx, const xtime * target) msvcp140d.dll!_Mtx_lock(_Mtx_internal_imp_t * mtx) thread_pool_test.exe!std::_Mtx_lockX(_Mtx_internal_imp_t * _Mtx) thread_pool_test.exe!std::_Mutex_base::lock() thread_pool_test.exe!std::lock_guard::lock_guard(std::mutex & _Mtx) thread_pool_test.exe!work_stealing_queue::try_steal(function_wrapper & res) thread_pool_test.exe!thread_pool_steal::pop_task_from_other_thread_queue(function_wrapper & task) thread_pool_test.exe!thread_pool_steal::run_pending_task() thread_pool_test.exe!thread_pool_steal::worker_thread(unsigned int my_index_) thread_pool_test.exe!std::_Invoker_pmf_pointer::_Call(void (unsigned int) * _Pmf, thread_pool_steal * && _Arg1, int && <_Args2_0>) Line 1373 C++ thread_pool_test.exe!std::invoke(void (unsigned int) * && _Obj, thread_pool_steal * && <_Args_0>, int && <_Args_1>) thread_pool_test.exe!std::_LaunchPad,std::default_delete > > >::_Execute<0,1,2>(std::tuple & _Tup, std::integer_sequence __formal) thread_pool_test.exe!std::_LaunchPad,std::default_delete > > >::_Run(std::_LaunchPad,std::default_delete > > > * _Ln) thread_pool_test.exe!std::_LaunchPad,std::default_delete > > >::_Go() thread_pool_test.exe!std::_Pad::_Call_func(void * _Data) ucrtbased.dll!0fa27e48() [Frames below may be incorrect and/or missing, no symbols loaded for ucrtbased.dll] ucrtbased.dll!0fa27b8b() kernel32.dll!@BaseThreadInitThunk@12 () ntdll.dll!___RtlUserThreadStart@8 () ntdll.dll!__RtlUserThreadStart@8 ()

Linux call stack:

[New Thread 0x7ffff6f5d700 (LWP 4395)]

Program received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff6f5d700 (LWP 4395)]
__GI___pthread_mutex_lock (mutex=0x0)
    at ../nptl/pthread_mutex_lock.c:66
66  ../nptl/pthread_mutex_lock.c: No such file or directory.
(gdb) bt
#0  __GI___pthread_mutex_lock (mutex=0x0)
    at ../nptl/pthread_mutex_lock.c:66
#1  0x0000000000401f53 in __gthread_mutex_lock (__mutex=0x50)
    at /usr/include/x86_64-linux-gnu/c++/4.9/bits/gthr-default.h:748
#2  0x00000000004023ba in std::mutex::lock (this=0x50)
    at /usr/include/c++/4.9/mutex:135
#3  0x000000000040370a in std::lock_guard<std::mutex>::lock_guard (
    this=0x7ffff6f5cd10, __m=...) at /usr/include/c++/4.9/mutex:377
#4  0x00000000004030fa in work_stealing_queue::try_steal (this=0x0, 
    res=...) at Source.cpp:250
#5  0x00000000004032c8 in thread_pool_steal::pop_task_from_other_thread_queue (this=0x7fffffffdac0, task=...) at Source.cpp:302
#6  0x00000000004035e4 in thread_pool_steal::run_pending_task (
    this=0x7fffffffdac0) at Source.cpp:358
#7  0x00000000004031ba in thread_pool_steal::worker_thread (
    this=0x7fffffffdac0, my_index_=0) at Source.cpp:283
#8  0x000000000040d3d4 in std::_Mem_fn<void (thread_pool_steal::*)(unsigned int)>::operator()<int, void>(thread_pool_steal*, int&&) const (
    this=0x62af78, __object=0x7fffffffdac0)
    at /usr/include/c++/4.9/functional:569
#9  0x000000000040cec9 in std::_Bind_simple<std::_Mem_fn<void (thread_pool_steal::*)(unsigned int)> (thread_pool_steal*, int)>::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (this=0x62af68)
    at /usr/include/c++/4.9/functional:1700
#10 0x000000000040c87f in std::_Bind_simple<std::_Mem_fn<void (thread_pool_steal::*)(unsigned int)> (thread_pool_steal*, int)>::operator()() (
    this=0x62af68) at /usr/include/c++/4.9/functional:1688
#11 0x000000000040c4ea in std::thread::_Impl<std::_Bind_simple<std::_Mem_fn<void (thread_pool_steal::*)(unsigned int)> (thread_pool_steal*, int)> >::_M_run() (this=0x62af50) at /usr/include/c++/4.9/thread:115
#12 0x00007ffff78f7e40 in ?? ()
   from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#13 0x00007ffff7bc4182 in start_thread (arg=0x7ffff6f5d700)
    at pthread_create.c:312
#14 0x00007ffff735e47d in clone ()
    at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

Code:

class work_stealing_queue
{
 private:
 typedef function_wrapper data_type;
 std::deque<data_type> the_queue;
 mutable std::mutex the_mutex;

bool empty() const
{
    std::lock_guard<std::mutex> lock(the_mutex);
    return the_queue.empty();
}

bool try_steal(data_type& res)
{
    std::lock_guard<std::mutex> lock(the_mutex);
    if (the_queue.empty())
    {
        return false;
    }

    res = std::move(the_queue.back());
    the_queue.pop_back();
    return true;
}
};


class thread_pool_steal
{

typedef function_wrapper task_type;

std::atomic_bool done;
threadsafe_queue<task_type> pool_work_queue;
std::vector<std::unique_ptr<work_stealing_queue> > queues;
std::vector<std::thread> threads;

static thread_local work_stealing_queue* local_work_queue;
static thread_local unsigned int my_index;

join_threads joiner;


bool pop_task_from_other_thread_queue(task_type& task)
{
    for (unsigned i = 0; i<queues.size(); ++i)
    {
        unsigned const index = (my_index + i + 1) % queues.size();
        if (queues[index]->try_steal(task))
        {
            return true;
        }
    }
    return false;
}

public:

thread_pool_steal() : done(false), joiner(threads)
{
    unsigned const thread_count = std::thread::hardware_concurrency();
    try
    {
        for (auto i = 0; i<thread_count; ++i)
        {
            queues.push_back(std::unique_ptr<work_stealing_queue>(std::make_unique<work_stealing_queue>()));
            threads.push_back(std::move(std::thread(&thread_pool_steal::worker_thread, this, i)));

        }
    }
    catch (...)
    {
        done = true;
        throw;
    }
};

~thread_pool_steal()
{
    done = true;
};
2
probably has something to do with threadsafe initialization of a static mutex object. I am struggling with a similar problem now. Are you compiling/linking with -pthread?iggy
@iggy Yes when I compile on a linux computer, I use pthreads. Thanks for the tip, I will look into if error has to do with the static initialization.Casey_StF
Actually it says work_stealing_queue::try_steal (this=0x0, so you have a null objectiggy
compile it with -fsanitize=address and see what you getiggy
Compiled with the following g++ -std="c++14" -g -fsanitize=address Source.cpp -o test -lpthread I getCasey_StF

2 Answers

3
votes

In Linux, "the_mutex" must be initialized before calling pthread_mutex_lock(). See Unix Network Programming Vol.1 2nd Ed. (W. Richard Stevens) p.626. It says "If a mutex variable is statically allocated, we must initialize it to the constant PTHREAD_MUTEX_INITIALIZER." It will look like this:

pthread_mutex_t the_mutex = PTHREAD_MUTEX_INITIALIZER;
1
votes

It appears this error is occurring because std::vector is not thread safe. While the main thread is adding to the the vector of work_stealing_queue,queues, the newly spawn threads call queues.size(). My guess is that the size() function is being incremented before the newest work_stealing_queue in queues is ready. The solution is to create two separate loops, one to create all the work_stealing_queue in queues and another to run the threads with the function worker_thread.

for(auto i=0;i<thread_count;++i)
            {
               queues.push_back(std::unique_ptr<work_stealing_queue>  (std::make_unique<work_stealing_queue>()));
            }

for(auto i=0;i<thread_count;++i) // Seperate becuse std::vector is not thread safe and causes issues when size() is used
            {
                threads.push_back(std::thread(&thread_pool_steal::worker_thread,this,i));
            }