I have implemented a thread pool. Now it works with basic operations as follows:
void initialise( bool detached_threads );
bool dispatch( void *(*dispatch_fn)(void*) , void * arg , bool free_arg );
void shut_down();
static void * execute_task( void * arg );
Now I would like to add the operation wait(), which will be called by the main thread and which will wait for all the threads in the thread pool to finish their tasks they are executing. I don't want to use pthread_join as this would kill all the threads and I don't want to create a thread pool again. I have implemented the wait operation as in the code provided below,but it seems not to be correct.
Please give me suggestion as to what is wrong. Thanks !!!
#include "../inc/ThreadPool.hpp"
#include <cstdio>
#include <cstdlib>
#include <iostream>
using namespace std;
ThreadPool::ThreadPool( unsigned int n )
:num_threads(n)
{
if(num_threads<=0)
{
num_threads = DEFAULT_THREAD_POOL_SIZE;
}
barrier_count = 0;
threads = (pthread_t*) malloc(sizeof(pthread_t)*num_threads);
shutdown = false;
dont_accept = false;
pthread_mutex_init(&barrier_lock,NULL);
pthread_cond_init(&barrier_reached,NULL);
pthread_mutex_init(&q_lock,NULL);
pthread_cond_init(&q_not_empty,NULL);
pthread_cond_init(&q_empty,NULL);
}
ThreadPool::~ThreadPool()
{
//cout << "~ThreadPool()" << endl;
}
void ThreadPool::initialise( bool detached_threads )
{
//pthread_attr_t attr;
//if(detached_threads)
//{
//pthread_attr_init(&attr);
//pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
//}
for ( int i = 0 ; i<num_threads ; i++ )
{
pthread_create( &threads[i] , NULL , execute_task , this );
}
}
bool ThreadPool::dispatch( void *(*routine)(void*) , void * arg , bool free_arg )
{
task_t * new_task = (task_t*) malloc(sizeof(task_t));
new_task->routine = routine;
new_task->arg = arg;
new_task->free_arg = free_arg;
pthread_mutex_lock(&q_lock);
if(dont_accept)
{
free(new_task);
return false;
}
bool was_empty = tasks.empty();
tasks.push(new_task);
if(was_empty)
{
pthread_cond_signal(&q_not_empty);
}
pthread_mutex_unlock(&q_lock);
return true;
}
void ThreadPool::shut_down()
{
void * return_val;
pthread_mutex_lock(&q_lock);
dont_accept = true;
while(!(tasks.empty()))
{
pthread_cond_wait(&q_empty,&q_lock);
}
shutdown = true;
pthread_cond_broadcast(&q_not_empty);
pthread_mutex_unlock(&q_lock);
for(int i=0 ; i<num_threads ; i++)
{
//pthread_join(threads[i],NULL);
pthread_join(threads[i],&return_val);
}
free(threads);
pthread_mutex_destroy(&barrier_lock);
pthread_cond_destroy(&barrier_reached);
pthread_mutex_destroy(&q_lock);
pthread_cond_destroy(&q_empty);
pthread_cond_destroy(&q_not_empty);
}
void ThreadPool::init_barrier()
{
pthread_mutex_lock(&barrier_lock);
barrier_count = 0;
pthread_mutex_unlock(&barrier_lock);
}
void ThreadPool::barrier( int ns )
{
pthread_mutex_lock(&barrier_lock);
barrier_count++;
if(barrier_count==ns)
{
for( int i=0 ; i<ns ; i++ )
{
pthread_cond_signal(&barrier_reached);
}
}else
{
while( barrier_count<ns )
{
pthread_cond_wait(&barrier_reached,&barrier_lock);
}
}
pthread_mutex_unlock(&barrier_lock);
}
void ThreadPool::wait()
{
pthread_mutex_lock(&q_lock);
while(!(tasks.empty()))
{
pthread_cond_wait(&q_empty,&q_lock);
}
pthread_mutex_unlock(&q_lock);
}
void * ThreadPool::execute_task( void * arg )
{
ThreadPool * thread_pool = (ThreadPool*) arg;
task_t * cur_task;
while(true)
{
pthread_mutex_lock(&(thread_pool->q_lock));
while((thread_pool->tasks).empty())
{
if(thread_pool->shutdown)
{
pthread_mutex_unlock(&(thread_pool->q_lock));
pthread_exit(NULL);
}
//cout << "I'm going to sleep!!!" << endl;
pthread_cond_wait(&(thread_pool->q_not_empty),&(thread_pool->q_lock));
//cout << "I've woken up!!!" << endl;
if(thread_pool->shutdown)
{
pthread_mutex_unlock(&(thread_pool->q_lock));
pthread_exit(NULL);
}
}
cur_task = thread_pool->tasks.front();
thread_pool->tasks.pop();
if(thread_pool->tasks.empty() && !thread_pool->shutdown )
{
pthread_cond_signal(&(thread_pool->q_empty));
}
pthread_mutex_unlock(&(thread_pool->q_lock));
//cout << "I'm executing a task!!!" << endl;
(cur_task->routine)(cur_task->arg);
if(cur_task->free_arg)
{
free(cur_task->arg);
}
free(cur_task);
//cout << "I'm done with the task!!!" << endl;
}
}