* 创建线程或者进程的开销是很大的
* 为了防止频繁的创建、销毁线程,提高程序的运行效率
* 往往会建立一个线程池用于多线程程序的调度
* 下面的程序就是完整的线程池实现
*
* 主要通过互斥量和条件变量实现同步
threadpool.h
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
/* 线程体数据结构*/
typedef struct runner
{
void (*callback)(void *arg);//回调函数指针
void *arg; //回调函数参数
struct runner *next;
}thread_runner;
/* 线程池数据结构*/
typedef struct pool
{
pthread_mutex_t mutex; //互斥量
pthread_cond_t cond; //条件变量
thread_runner* runner_head; //线程池中所有等待任务的头指针
thread_runner* runner_tail; //线程池中所有等待任务的尾指针
int shutdown; //线程池是否销毁,0没有注销,1注销
pthread_t* threads; //所有线程
int max_thread_size; //线程池中允许的活动线程数目
}thread_pool;
void run(void *arg);
void threadpool_init(thread_pool *pool, int max_thread_size);
void threadpool_add_runner(thread_pool *pool, void (*callback)(void *arg), void *arg);
void threadpool_destroy(thread_pool **ppool);
#endif //_THREADPOOL_H_
threadpool.c
//======================= threadpool.c ===========================
#include "threadpool.h"
/**********************************************************
* 初始化线程
* 参数:
* pool:指向线程池结构有效地址的动态指针
* max_thread_size:最大的线程数
**********************************************************/
void threadpool_init(thread_pool *pool, int max_thread_size)
{
int iLoop = 0;
/*线程池初始化操作*/
pthread_mutex_init(&(pool->mutex), NULL); //初始化互斥量
pthread_cond_init(&(pool->cond), NULL); //初始化条件变量
pool->shutdown = 0; //线程池默认没有注销
pool->threads = (pthread_t *)malloc(max_thread_size * sizeof(pthread_t)); //创建所有分离线程
pool->runner_head = NULL;
pool->runner_tail = NULL;
pool->max_thread_size = max_thread_size;
/*创建线程操作*/
for(iLoop; iLoop < max_thread_size; iLoop++)
{
pthread_attr_t attr; //定义线程对象
pthread_attr_init(&attr); //初始化线程的属性
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //设置脱离状态的属性(决定这个线程在终止时是否可以被结合)
pthread_create(&(pool->threads[iLoop]), &attr, (void *)run, (void *)pool); /*threads[i] 动态创建线程;
*第一个参数为指向线程标识符的指针。
*第二个参数用来设置线程属性。
*第三个参数是线程运行函数的起始地址。
*最后一个参数是运行函数的参数。*/
}
printf("threadpool_init-> create %d detached thread\n");
}
/**********************************************************
* 线程体,创建线程后调用的函数
* 参数:
* arg:接收创建线程后传递的参数
**********************************************************/
void run(void *arg)
{
thread_pool *pool = (thread_pool *)arg;
while(1)
{
pthread_mutex_lock(&(pool->mutex)); //加锁
printf("run-> locked!\n");
/*如果等待队列为0并且线程池未销毁,则处于阻塞状态 */
while(pool->runner_head == NULL && !pool->shutdown)
{
pthread_cond_wait(&(pool->cond), &(pool->mutex));
}
/*如果线程已经销毁*/
if(pool->shutdown)
{
pthread_mutex_unlock(&(pool->mutex)); //解锁
printf("run-> unlock and thread exit!\n");
pthread_exit(NULL);
}
thread_runner *runner = pool->runner_head; //取链表的头元素
pool->runner_head = runner->next;
pthread_mutex_unlock(&(pool->mutex)); //解锁
printf("run-> unlocked!\n");
(runner->callback)(runner->arg); //调用回调函数,执行任务
free(runner); //释放线程操作
runner = NULL;
printf("run-> runned and free runner!\n");
}
pthread_exit(NULL);
}
/**********************************************************
* 向线程池加入任务
* 参数:
* pool:指向线程池结构有效地址的动态指针
* callback:线程回调函数
* arg:回调函数参数
**********************************************************/
void threadpool_add_runner(thread_pool *pool, void(*callback)(void *arg), void *arg)
{
thread_runner *newrunner = (thread_runner *)malloc(sizeof(thread_runner));//构建一个新任务
newrunner->callback = callback;
newrunner->arg = arg;
newrunner->next = NULL;
pthread_mutex_lock(&(pool->mutex)); //加锁
printf("threadpool_add_runner-> locked\n");
/*将新任务加入到等待队列中,如果等待队列为空,直接运行当前的线程 */
if(pool->runner_head != NULL)
{
pool->runner_tail->next = newrunner;
pool->runner_tail = newrunner;
}
else
{
pool->runner_head = newrunner;
pool->runner_tail = newrunner;
}
pthread_mutex_unlock(&(pool->mutex)); //解锁
printf("threadpool_add_runner-> unlocked\n");
pthread_cond_signal(&(pool->cond)); //唤醒一个等待线程
printf("threadpool_add_runner-> add a runner and wakeup a waiting thread\n");
}
/**********************************************************
* 销毁线程池
* 参数:
* ppool:指向线程池结构有效地址的动态指针地址(二级指针)
**********************************************************/
void threadpool_destroy(thread_pool **ppool)
{
thread_pool *pool = *ppool;
/*判断线程池是否注销,防止二次销毁*/
if(!pool->shutdown)
{
pool->shutdown = 1;
pthread_cond_broadcast(&(pool->cond)); //唤醒所有的等待线程,线程池要销毁了
sleep(1); //等待所有的线程终止
printf("threadpool_destroy-> wakeup all waitting threads\n");
free(pool->threads); //回收空间
/*销毁等待队列*/
thread_runner *head = NULL;
while(pool->runner_head != NULL)
{
head = pool->runner_head;
pool->runner_head = pool->runner_head->next;
free(head);
}
printf("thread_destroy-> all runners freed\n");
pthread_mutex_destroy(&(pool->mutex)); //销毁条件变量
pthread_cond_destroy(&(pool->cond)); //销毁互斥量
printf("thread_destroy-> mutex and cond destroyed\n");
free(pool);
pool = NULL;
(*ppool) = NULL;
printf("threadpool_destroy-> pool freed\n");
}
}
//======================= end threadpool.c ===========================
main.c
#include "threadpool.h"
void threadrun(void *arg)
{
int i = *(int *)arg;
printf("threadrun result == %d\n", i);
}
int main(int argc, char *argv[])
{
thread_pool *pool = (thread_pool *)malloc(sizeof(thread_pool));
threadpool_init(pool, 9);
int i;
int tmp[10];
for(i = 0; i < 10; i++)
{
tmp[i] = i;
threadpool_add_runner(pool, threadrun, &tmp[i]);
}
sleep(1);
threadpool_destroy(&pool);
printf("main-> %p\n", pool);
printf("main-> test over\n");
return 0;
}
Makefile
#Makefile
main: mian.o threadpool.o
gcc -o main test.o threadpool.o -lpthread
mian.o: threadpool.h
gcc -c main.c -lpthread
hreadpool.o: threadpool.h
gcc -c threadpool.c -lpthread
.PHONY:clean
clean:
rm -f *.o main