// @Author Lin Ya
// @Email xxbbb@vip.qq.com
// This file has not been used
#include "ThreadPool.h"
pthread_mutex_t ThreadPool::lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t ThreadPool::notify = PTHREAD_COND_INITIALIZER;
std::vector ThreadPool::threads;
std::vector ThreadPool::queue;
int ThreadPool::thread_count = 0;
int ThreadPool::queue_size = 0;
int ThreadPool::head = 0;
int ThreadPool::tail = 0;
int ThreadPool::count = 0;
int ThreadPool::shutdown = 0;
int ThreadPool::started = 0;
int ThreadPool::threadpool_create(int _thread_count, int _queue_size)
{
bool err = false;
do
{
if(_thread_count <= 0 || _thread_count > MAX_THREADS || _queue_size <= 0 || _queue_size > MAX_QUEUE)
{
_thread_count = 4;
_queue_size = 1024;
}
thread_count = 0;
queue_size = _queue_size;
head = tail = count = 0;
shutdown = started = 0;
threads.resize(_thread_count);
queue.resize(_queue_size);
/* Start worker threads */
for(int i = 0; i < _thread_count; ++i)
{
if(pthread_create(&threads[i], NULL, threadpool_thread, (void*)(0)) != 0)
{
//threadpool_destroy(pool, 0);
return -1;
}
++thread_count;
++started;
}
} while(false);
if (err)
{
//threadpool_free(pool);
return -1;
}
return 0;
}
int ThreadPool::threadpool_add(std::shared_ptr args, std::function)> fun)
{
int next, err = 0;
if(pthread_mutex_lock(&lock) != 0)
return THREADPOOL_LOCK_FAILURE;
do
{
next = (tail + 1) % queue_size;
// 队列满
if(count == queue_size)
{
err = THREADPOOL_QUEUE_FULL;
break;
}
// 已关闭
if(shutdown)
{
err = THREADPOOL_SHUTDOWN;
break;
}
queue[tail].fun = fun;
queue[tail].args = args;
tail = next;
++count;
/* pthread_cond_broadcast */
if(pthread_cond_signal(¬ify) != 0)
{
err = THREADPOOL_LOCK_FAILURE;
break;
}
} while(false);
if(pthread_mutex_unlock(&lock) != 0)
err = THREADPOOL_LOCK_FAILURE;
return err;
}
int ThreadPool::threadpool_destroy(ShutDownOption shutdown_option)
{
printf("Thread pool destroy !\n");
int i, err = 0;
if(pthread_mutex_lock(&lock) != 0)
{
return THREADPOOL_LOCK_FAILURE;
}
do
{
if(shutdown) {
err = THREADPOOL_SHUTDOWN;
break;
}
shutdown = shutdown_option;
if((pthread_cond_broadcast(¬ify) != 0) ||
(pthread_mutex_unlock(&lock) != 0)) {
err = THREADPOOL_LOCK_FAILURE;
break;
}
for(i = 0; i < thread_count; ++i)
{
if(pthread_join(threads[i], NULL) != 0)
{
err = THREADPOOL_THREAD_FAILURE;
}
}
} while(false);
if(!err)
{
threadpool_free();
}
return err;
}
int ThreadPool::threadpool_free()
{
if(started > 0)
return -1;
pthread_mutex_lock(&lock);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(¬ify);
return 0;
}
void *ThreadPool::threadpool_thread(void *args)
{
while (true)
{
ThreadPoolTask task;
pthread_mutex_lock(&lock);
while((count == 0) && (!shutdown))
{
pthread_cond_wait(¬ify, &lock);
}
if((shutdown == immediate_shutdown) ||
((shutdown == graceful_shutdown) && (count == 0)))
{
break;
}
task.fun = queue[head].fun;
task.args = queue[head].args;
queue[head].fun = NULL;
queue[head].args.reset();
head = (head + 1) % queue_size;
--count;
pthread_mutex_unlock(&lock);
(task.fun)(task.args);
}
--started;
pthread_mutex_unlock(&lock);
printf("This threadpool thread finishs!\n");
pthread_exit(NULL);
return(NULL);
}