156
votes

Relevant questions:

About C++11:

About Boost:


How do I get a pool of threads to send tasks to, without creating and deleting them over and over again? This means persistent threads to resynchronize without joining.


I have code that looks like this:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

Instead of creating and joining threads each iteration, I'd prefer to send tasks to my worker threads each iteration and only create them once.

10
here's a related question and my answer.didierc
thought about using tbb (it's Intel, but free & open source, and does exactly what you want: you simply submit (recursively divisible) tasks and don't worry about the threads)?Walter
This FOSS project is my attempt to create a thread pool library, check it out if you want. -> code.google.com/p/threadpool11Etherealone
What's wrong with using tbb?Walter

10 Answers

127
votes

This is copied from my answer to another very similar post, hope it can help:

  1. Start with maximum number of threads a system can support:

    int Num_Threads = thread::hardware_concurrency();

  2. For an efficient threadpool implementation, once threads are created according to Num_Threads, it's better not to create new ones, or destroy old ones (by joining). There will be performance penalty, might even make your application goes slower than the serial version.

Each C++11 thread should be running in their function with an infinite loop, constantly waiting for new tasks to grab and run.

Here is how to attach such function to the thread pool:

int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{  Pool.push_back(thread(Infinite_loop_function));}
  1. The Infinite_loop_function

This is a "while(true)" loop waiting for the task queue

void The_Pool:: Infinite_loop_function()
{
    while(true)
    {
        {
            unique_lock<mutex> lock(Queue_Mutex);

            condition.wait(lock, [this](){return !Queue.empty() || terminate_pool;});
            Job = Queue.front();
            Queue.pop();
        }
        Job(); // function<void()> type
    }
};
  1. Make a function to add job to your Queue
void The_Pool:: Add_Job(function<void()> New_Job)
{
    {
         unique_lock<mutex> lock(Queue_Mutex);
         Queue.push(New_Job);
    }
        condition.notify_one();
}
  1. Bind an arbitrary function to your Queue
    Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for job to do.

I apologize if there are some syntax errors, I typed these code and and I have a bad memory. Sorry that I cannot provide you the complete thread pool code, that would violate my job integrity.

Edit: to terminate the pool, call the shutdown() method:

XXXX::shutdown()
{
    {
        unique_lock<mutex> lock(threadpool_mutex);
        terminate_pool = true; // use this flag in condition.wait
    }
    condition.notify_all(); // wake up all threads.

    // Join all threads.
    for(std::thread &every_thread : thread_vector)
    {   every_thread.join();}

    thread_vector.clear();  
    stopped = true; // use this flag in destructor, if not set, call shutdown() 
}
92
votes

You can use C++ Thread Pool Library, https://github.com/vit-vit/ctpl.

Then the code your wrote can be replaced with the following

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

You will get the desired number of threads and will not create and delete them over and over again on the iterations.

71
votes

A pool of threads means that all your threads are running, all the time – in other words, the thread function never returns. To give the threads something meaningful to do, you have to design a system of inter-thread communication, both for the purpose of telling the thread that there's something to do, as well as for communicating the actual work data.

Typically this will involve some kind of concurrent data structure, and each thread would presumably sleep on some kind of condition variable, which would be notified when there's work to do. Upon receiving the notification, one or several of the threads wake up, recover a task from the concurrent data structure, process it, and store the result in an analogous fashion.

The thread would then go on to check whether there's even more work to do, and if not go back to sleep.

The upshot is that you have to design all this yourself, since there isn't a natural notion of "work" that's universally applicable. It's quite a bit of work, and there are some subtle issues you have to get right. (You can program in Go if you like a system which takes care of thread management for you behind the scenes.)

20
votes

A threadpool is at core a set of threads all bound to a function working as an event loop. These threads will endlessly wait for a task to be executed, or their own termination.

The threadpool job is to provide an interface to submit jobs, define (and perhaps modify) the policy of running these jobs (scheduling rules, thread instantiation, size of the pool), and monitor the status of the threads and related resources.

So for a versatile pool, one must start by defining what a task is, how it is launched, interrupted, what is the result (see the notion of promise and future for that question), what sort of events the threads will have to respond to, how they will handle them, how these events shall be discriminated from the ones handled by the tasks. This can become quite complicated as you can see, and impose restrictions on how the threads will work, as the solution becomes more and more involved.

The current tooling for handling events is fairly barebones(*): primitives like mutexes, condition variables, and a few abstractions on top of that (locks, barriers). But in some cases, these abstrations may turn out to be unfit (see this related question), and one must revert to using the primitives.

Other problems have to be managed too:

  • signal
  • i/o
  • hardware (processor affinity, heterogenous setup)

How would these play out in your setting?

This answer to a similar question points to an existing implementation meant for boost and the stl.

I offered a very crude implementation of a threadpool for another question, which doesn't address many problems outlined above. You might want to build up on it. You might also want to have a look of existing frameworks in other languages, to find inspiration.


(*) I don't see that as a problem, quite to the contrary. I think it's the very spirit of C++ inherited from C.

9
votes
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

function_pool.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}
7
votes

You can use thread_pool from boost library:

void my_task(){...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);

    // Submit a function to the pool.
    boost::asio::post(pool, my_task);

    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}

You also can use threadpool from open source community:

void first_task() {...}    
void second_task() {...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}
3
votes

Something like this might help (taken from a working app).

#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

struct thread_pool {
  typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;

  thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      auto worker = [this] { return service.run(); };
      grp.add_thread(new boost::thread(worker));
    }
  }

  template<class F>
  void enqueue(F f) {
    service.post(f);
  }

  ~thread_pool() {
    service_worker.reset();
    grp.join_all();
    service.stop();
  }

private:
  boost::asio::io_service service;
  asio_worker service_worker;
  boost::thread_group grp;
};

You can use it like this:

thread_pool pool(2);

pool.enqueue([] {
  std::cout << "Hello from Task 1\n";
});

pool.enqueue([] {
  std::cout << "Hello from Task 2\n";
});

Keep in mind that reinventing an efficient asynchronous queuing mechanism is not trivial.

Boost::asio::io_service is a very efficient implementation, or actually is a collection of platform-specific wrappers (e.g. it wraps I/O completion ports on Windows).

3
votes

Edit: This now requires C++17 and concepts. (As of 9/12/16, only g++ 6.0+ is sufficient.)

The template deduction is a lot more accurate because of it, though, so it's worth the effort of getting a newer compiler. I've not yet found a function that requires explicit template arguments.

It also now takes any appropriate callable object (and is still statically typesafe!!!).

It also now includes an optional green threading priority thread pool using the same API. This class is POSIX only, though. It uses the ucontext_t API for userspace task switching.


I created a simple library for this. An example of usage is given below. (I'm answering this because it was one of the things I found before I decided it was necessary to write it myself.)

bool is_prime(int n){
  // Determine if n is prime.
}

int main(){
  thread_pool pool(8); // 8 threads

  list<future<bool>> results;
  for(int n = 2;n < 10000;n++){
    // Submit a job to the pool.
    results.emplace_back(pool.async(is_prime, n));
  }

  int n = 2;
  for(auto i = results.begin();i != results.end();i++, n++){
    // i is an iterator pointing to a future representing the result of is_prime(n)
    cout << n << " ";
    bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
    if(prime)
      cout << "is prime";
    else
      cout << "is not prime";
    cout << endl;
  }  
}

You can pass async any function with any (or void) return value and any (or no) arguments and it will return a corresponding std::future. To get the result (or just wait until a task has completed) you call get() on the future.

Here's the github: https://github.com/Tyler-Hardin/thread_pool.

3
votes

This is another thread pool implementation that is very simple, easy to understand and use, uses only C++11 standard library, and can be looked at or modified for your uses, should be a nice starter if you want to get into using thread pools:

https://github.com/progschj/ThreadPool

1
votes

A threadpool with no dependencies outside of STL is entirely possible. I recently wrote a small header-only threadpool library to address the exact same problem. It supports dynamic pool resizing (changing the number of workers at runtime), waiting, stopping, pausing, resuming and so on. I hope you find it useful.