0
votes

I have implemented a thread pool. Now it works with basic operations as follows:

void initialise( bool detached_threads );

bool dispatch( void *(*dispatch_fn)(void*) , void * arg , bool free_arg );

void shut_down();

static void * execute_task( void * arg );

Now I would like to add the operation wait(), which will be called by the main thread and which will wait for all the threads in the thread pool to finish their tasks they are executing. I don't want to use pthread_join as this would kill all the threads and I don't want to create a thread pool again. I have implemented the wait operation as in the code provided below,but it seems not to be correct.

Please give me suggestion as to what is wrong. Thanks !!!

#include "../inc/ThreadPool.hpp"

#include <cstdio>
#include <cstdlib>
#include <iostream>

using namespace std;

ThreadPool::ThreadPool( unsigned int n )
:num_threads(n)
{
    if(num_threads<=0)
    {
    num_threads = DEFAULT_THREAD_POOL_SIZE;
    }

    barrier_count = 0;
    threads = (pthread_t*) malloc(sizeof(pthread_t)*num_threads);
    shutdown = false;
    dont_accept = false;

    pthread_mutex_init(&barrier_lock,NULL);
    pthread_cond_init(&barrier_reached,NULL);

    pthread_mutex_init(&q_lock,NULL);
    pthread_cond_init(&q_not_empty,NULL);
    pthread_cond_init(&q_empty,NULL);
}

ThreadPool::~ThreadPool()
{
  //cout << "~ThreadPool()" << endl; 
}

void ThreadPool::initialise( bool detached_threads )
{
    //pthread_attr_t attr;

    //if(detached_threads)
    //{
    //pthread_attr_init(&attr);
    //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    //}

    for ( int i = 0 ; i<num_threads ; i++ )
    {
      pthread_create( &threads[i] , NULL , execute_task , this );
    }
}

bool ThreadPool::dispatch( void *(*routine)(void*) , void * arg , bool free_arg )
{
    task_t * new_task = (task_t*) malloc(sizeof(task_t));
    new_task->routine = routine;
    new_task->arg = arg;
    new_task->free_arg = free_arg;

    pthread_mutex_lock(&q_lock);

    if(dont_accept)
    {
        free(new_task);
        return false;
    }

    bool was_empty = tasks.empty();
    tasks.push(new_task);

    if(was_empty)
    {
        pthread_cond_signal(&q_not_empty);
    }

    pthread_mutex_unlock(&q_lock);

    return true;
}

void ThreadPool::shut_down()
{
      void * return_val;
      pthread_mutex_lock(&q_lock);
      dont_accept = true;

      while(!(tasks.empty()))
      {
      pthread_cond_wait(&q_empty,&q_lock);
      }

      shutdown = true;
      pthread_cond_broadcast(&q_not_empty);
      pthread_mutex_unlock(&q_lock);

      for(int i=0 ; i<num_threads ; i++)
      {
      //pthread_join(threads[i],NULL);
      pthread_join(threads[i],&return_val);
      }

      free(threads);

      pthread_mutex_destroy(&barrier_lock);
      pthread_cond_destroy(&barrier_reached);

      pthread_mutex_destroy(&q_lock);
      pthread_cond_destroy(&q_empty);
      pthread_cond_destroy(&q_not_empty);
}

void ThreadPool::init_barrier()
{
    pthread_mutex_lock(&barrier_lock);

    barrier_count = 0;

    pthread_mutex_unlock(&barrier_lock);
}

void ThreadPool::barrier( int ns )
{
    pthread_mutex_lock(&barrier_lock);

    barrier_count++;

    if(barrier_count==ns)
    {
        for( int i=0 ; i<ns ; i++ )
        {
            pthread_cond_signal(&barrier_reached);
        }
    }else
    {
        while( barrier_count<ns )
        {
            pthread_cond_wait(&barrier_reached,&barrier_lock);
        }
    }

    pthread_mutex_unlock(&barrier_lock);
}

void ThreadPool::wait()
{
      pthread_mutex_lock(&q_lock);

      while(!(tasks.empty()))
      {
      pthread_cond_wait(&q_empty,&q_lock);
      }

      pthread_mutex_unlock(&q_lock);
}


void * ThreadPool::execute_task( void * arg )
{
    ThreadPool * thread_pool = (ThreadPool*) arg;
    task_t * cur_task;

    while(true)
    {
    pthread_mutex_lock(&(thread_pool->q_lock));

    while((thread_pool->tasks).empty())
    {
          if(thread_pool->shutdown)
          {
          pthread_mutex_unlock(&(thread_pool->q_lock));
          pthread_exit(NULL);
          }

          //cout << "I'm going to sleep!!!" << endl;

          pthread_cond_wait(&(thread_pool->q_not_empty),&(thread_pool->q_lock));

          //cout << "I've woken up!!!" << endl;

          if(thread_pool->shutdown)
          {
          pthread_mutex_unlock(&(thread_pool->q_lock));
          pthread_exit(NULL);
          }
    }

    cur_task = thread_pool->tasks.front();
    thread_pool->tasks.pop();

    if(thread_pool->tasks.empty() && !thread_pool->shutdown )
    {
          pthread_cond_signal(&(thread_pool->q_empty));
    }

    pthread_mutex_unlock(&(thread_pool->q_lock));

    //cout << "I'm executing a task!!!" << endl;

    (cur_task->routine)(cur_task->arg);

    if(cur_task->free_arg)
    {
           free(cur_task->arg);
    }

    free(cur_task);

    //cout << "I'm done with the task!!!" << endl;
    }
}
1
This doesn't sound good, a threadpool should never wait for all threads to complete. That stops it from doing its job. An app that uses the threadpool may well be interested in a certain set of tp-threads to complete, it implements that itself by having the threads it acquired from the pool signal their completion. At best you might want to do this when the app terminates. Don't do that either, very high odds for deadlock.Hans Passant
@Hans Passant I am aware of that. But the application I am developing needs a thread pool with the wait() operation. My objective is not implementing a thread pool with such behavior, but is using such an operation in my application.cpp_noname

1 Answers

1
votes

Well, what I usually do is to request a 'TasksetWait', (TW), object from the threadpool, issue tasks through a 'dispatch' method and then, for synchronous notification, call an 'AwaitCompletion() method. The TW is supplied with a private mutex already locked for the requesting thread, (ensuring it has exclusive access for now), a task counter int, a 'completed' condvar/event for the requester to wait on and a reference to its pool. The TW dispatch forwards the tasks to the pool by loading a ref to itself into each task, pushing the task onto its pool and counting them out by incrementing the task counter.

The requesting thread then calls TW->AwaitCompletion, which unlocks the mutex and waits on the event.

Meanwhile, pool threads are executing the tasks run() methods. After run() returns, the task calls an 'OnCompletion() method of the TW that locks the mutex and decrements the count. If the count is still non-zero, it unlocks the mutex and exits. If the count is zero, it unlocks the mutex, signals the event and exits.

When the requester runs again, it could return the TW to the pool, (which may keep a cache of them), or just destroy it.

A variation is for the requester to supply an 'OnCompletion method to the TW so that the pool thread that finishes the last task can call it, so providing asynchronous notification, (as might be required to post a message to a GUI input queue).

A mechanism like this allows the threadpool to be used by multiple requester threads or, (with asynchronous notification), for a requester to issue multiple task blocks, though it can get a bit confusing if the requester thread is itself a pool thread running a task in the pool, (best to avoid that if you want to keep any sort of grasp of what your process is actually doing:).