3
votes

I have a pre-defined number of tasks (objects of Task) and an also pre-defined number of threads.

Let's assume 8 threads and 32 tasks.

My current solution allows me to run 8 tasks in parallel, and once they're finished, run another 8 in parallel.

With this solution it might happen, that 7 tasks will have to wait for the slowest one.

I'd like to have a solution, where once a thread finishes its task, it'll directly re-run with a new task, until there are none left.

My current implementation:

vector <Task> tasks; //the 32 tasks
unsigned int maxThreadCount(8);

for ( unsigned int i = 0; i < tasks.size() - maxThreadCount; i+=maxThreadCount )
{
    vector <thread> threads;
    for (unsigned int j = 0; j < maxThreadCount; ++j)
        threads.push_back (thread(std::bind( &Task::some_function, tasks[i+j])));

    for ( auto &i : threads)
        i.join();
}
4
Sounds like you want to use the producer/consumer pattern.cdhowie

4 Answers

3
votes

This is a bit of a hack together, and obviously specific to your needs. The idea is to have the threads keep pulling data and sending it to your member function until the task list is exhausted, at which point each thread terminates on that discovery.

Something like this:

static void ThreadProc(
    std::vector<Task>& tasks,
    std::atomic_uint& idx, 
    void (Task::*member)())
{
    unsigned int i = 0;
    while ((i = idx++) < tasks.size())
        (tasks[i].*member)();
}

void RunTasks(
    std::vector<Task>& tasks, 
    unsigned int n_threads = std::max(std::thread::hardware_concurrency(),1U))
{
    // adjust number of threads to be no larger than the task set
    //  there is no reason to spin up new threads that will have
    //   nothing to do but exit.
    if (tasks.size() < n_threads)
        n_threads = tasks.size();

    // shared index into the task container        
    std::atomic_uint idx{0};

    // spin up the work crew
    std::vector<std::thread> threads;
    threads.reserve(n_threads);
    while (n_threads--)
    {
        threads.emplace_back(ThreadProc, 
                             std::ref(tasks), 
                             std::ref(idx), 
                             &Task::some_function);
    }

    for (auto &t : threads)
        t.join();
}

For a fixed set of tasks this is about as simple as a thread-proc invoke-member model can get. It requires no additional container 'queue'; the task vector is sufficient. And it requires no lock objects, using a much-lighter atomic instead.

Obviously this all goes out the window if you need enhanced container access, such as appending new items to the task vector mid-task, etc. A different approach would be required for something like that, but for a single-shot work-crew approach to a fixed listed of tasks this is hard to beat. From your description this is all you need.

Best of luck.

3
votes

Have you considered using a queue to store the Tasks, and having each thread pop a Task from the queue to process it? Something like this:

http://vichargrave.com/multithreaded-work-queue-in-c/

1
votes

With Boost.Asio this is very easy to do. Just call the io_service::run method from 8 threads and then you can io_service::post your work items to the io_service. An example can be found here.

1
votes

Why not to use full-fledged parallel-processing libraries or language extensions like , , . All these are quite portable now, having support from main OSes and compilers.

So, you don't have to invent the wheel but let the libraries to take care of threads and the load balance; and prevent you from traps like this: Why is OpenMP outperforming threads?

For example, the vector of tasks can be run in parallel by default number of threads as simple as:

cilk_for(int i = 0; i < tasks.size(); i++)
    task[i].some_function();

You can also change the number of threads if needed.