已经个把月没有写长篇博文了,最近抽了点时间,将memcached源码分析系列文章的线程机制篇给整出来,在分析源码的过程中参考了网上的一些资源。
该文主要集中于两个问题:(1)memcached线程池是如何创建的,(2)线程池中的线程又是如何进行调度的。一切从源码中找答案。
memcached的线程池模型采用较典型的Master-Worker模型:
(1)主线程负责监听客户端的建立连接请求,以及accept 连接,将连接好的套接字放入连接队列;
(2)调度workers空闲线程来负责处理已经建立好的连接的读写等事件。
1 关键数据抽象
(1)memcached单个线程结构的封装
1 //memcached线程结构的封装结构 2 typedef struct { 3 pthread_t thread_id; /* unique ID of this thread */ 4 struct event_base *base; /* libevent handle this thread uses */ 5 struct event notify_event; /* listen event for notify pipe */ 6 int notify_receive_fd; /* receiving end of notify pipe */ 7 int notify_send_fd; /* sending end of notify pipe */ 8 struct thread_stats stats; /* Stats generated by this thread */ 9 struct conn_queue *new_conn_queue; /* queue of new connections to handle */ 10 cache_t *suffix_cache; /* suffix cache */ 11 } LIBEVENT_THREAD;
这是memcached里的线程结构的封装,可以看到每个线程都包含一个CQ队列,一条通知管道pipe ,
一个libevent的实例event_base等。(2)线程连接队列
1 /* A connection queue. */ 2 typedef struct conn_queue CQ; 3 struct conn_queue { 4 CQ_ITEM *head; 5 CQ_ITEM *tail; 6 pthread_mutex_t lock; 7 pthread_cond_t cond; 8 };
每个线程结构体中都指向一个CQ链表,CQ链表管理CQ_ITEM的单向链表。
(3)连接项结构体
1 /* An item in the connection queue. */ 2 typedef struct conn_queue_item CQ_ITEM; 3 struct conn_queue_item { 4 int sfd; 5 enum conn_states init_state; 6 int event_flags; 7 int read_buffer_size; 8 enum network_transport transport; 9 CQ_ITEM *next; 10 };
CQ_ITEM实际上是主线程accept后返回的已建立连接的fd的封装,由主线程创建初始化并放入连接链表CQ中,共workers线程使用。
(4)网络连接的封装结构体
1 /** 2 * The structure representing a connection into memcached. 3 */ 4 //memcached表示一个conn的抽象结构 5 typedef struct conn conn; 6 struct conn { 7 .................. 8 };
由于这个结构太大,就略去中间的成员不展示了,与我们线程池相关的有一个成员则非常关键,那就是state,它是memcached中状态机驱动的关键(由drive_machine函数实现)。
2 线程池的初始化:
main()中线程池初始化函数入口为:
/* start up worker threads if MT mode */
thread_init(settings.num_threads, main_base);
函数的定义在thread.c实现,源码如下所示:
1 /* 2 * Initializes the thread subsystem, creating various worker threads. 3 * 4 * nthreads Number of worker event handler threads to spawn 5 * main_base Event base for main thread 6 */ 7 void thread_init(int nthreads, struct event_base *main_base) { 8 int i; 9 10 pthread_mutex_init(&cache_lock, NULL); 11 pthread_mutex_init(&stats_lock, NULL); 12 13 pthread_mutex_init(&init_lock, NULL); 14 pthread_cond_init(&init_cond, NULL); 15 16 pthread_mutex_init(&cqi_freelist_lock, NULL); 17 cqi_freelist = NULL; 18 19 //分配线程池结构数组 20 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); 21 if (! threads) { 22 perror("Can't allocate thread descriptors"); 23 exit(1); 24 } 25 26 dispatcher_thread.base = main_base; 27 dispatcher_thread.thread_id = pthread_self(); 28 29 //为线程池每个线程创建读写管道 30 for (i = 0; i < nthreads; i++) { 31 int fds[2]; 32 if (pipe(fds)) { 33 perror("Can't create notify pipe"); 34 exit(1); 35 } 36 37 threads[i].notify_receive_fd = fds[0]; 38 threads[i].notify_send_fd = fds[1]; 39 40 //填充线程结构体信息 41 setup_thread(&threads[i]); 42 } 43 44 /* Create threads after we've done all the libevent setup. */ 45 for (i = 0; i < nthreads; i++) { 46 //为线程池创建数目为nthreads的线程,worker_libevent为线程的回调函数, 47 create_worker(worker_libevent, &threads[i]); 48 } 49 50 /* Wait for all the threads to set themselves up before returning. */ 51 pthread_mutex_lock(&init_lock); 52 while (init_count < nthreads) { 53 pthread_cond_wait(&init_cond, &init_lock); 54 } 55 pthread_mutex_unlock(&init_lock); 56 }
线程池初始化函数由主线程进行调用,该函数先初始化各互斥锁,然后使用calloc分配nthreads*sizeof(LIBEVENT_THREAD)个字节的内存块来管理线程池,返回一个全局static变量 threads(类型为LIBEVENT_THREAD *);然后为每个线程创建一个匿名管道(该pipe将在线程的调度中发挥作用),接下来的setup_thread函数为线程设置事件监听,绑定CQ链表等初始化信息,源码如下所示:
1 /* 2 * Set up a thread's information. 3 */ 4 static void setup_thread(LIBEVENT_THREAD *me) { 5 me->base = event_init(); 6 if (! me->base) { 7 fprintf(stderr, "Can't allocate event base\n"); 8 exit(1); 9 } 10 11 /* Listen for notifications from other threads */ 12 //为管道设置读事件监听,thread_libevent_process为回调函数 13 event_set(&me->notify_event, me->notify_receive_fd, 14 EV_READ | EV_PERSIST, thread_libevent_process, me); 15 event_base_set(me->base, &me->notify_event); 16 17 if (event_add(&me->notify_event, 0) == -1) { 18 fprintf(stderr, "Can't monitor libevent notify pipe\n"); 19 exit(1); 20 } 21 22 //为新线程创建连接CQ链表 23 me->new_conn_queue = malloc(sizeof(struct conn_queue)); 24 if (me->new_conn_queue == NULL) { 25 perror("Failed to allocate memory for connection queue"); 26 exit(EXIT_FAILURE); 27 } 28 //初始化线程控制器内的CQ链表 29 cq_init(me->new_conn_queue); 30 31 if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) { 32 perror("Failed to initialize mutex"); 33 exit(EXIT_FAILURE); 34 } 35 //创建cache 36 me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*), 37 NULL, NULL); 38 if (me->suffix_cache == NULL) { 39 fprintf(stderr, "Failed to create suffix cache\n"); 40 exit(EXIT_FAILURE); 41 } 42 }
memcached使用libevent实现事件循环,关于libevent,不熟悉的读者可以查看相关资料,这里不做介绍,源码中的这句代码:
event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);
在me->notify_receive_fd(即匿名管道的读端)设置可读事件,回调函数 为thread_libevent_process,函数定义如下:
1 static void thread_libevent_process(int fd, short which, void *arg) { 2 LIBEVENT_THREAD *me = arg; 3 CQ_ITEM *item; 4 char buf[1]; 5 6 //响应pipe可读事件,读取主线程向管道内写的1字节数据(见dispatch_conn_new()函数) 7 if (read(fd, buf, 1) != 1) 8 if (settings.verbose > 0) 9 fprintf(stderr, "Can't read from libevent pipe\n"); 10 11 //从链接队列中取出一个conn 12 item = cq_pop(me->new_conn_queue); 13 14 if (NULL != item) { 15 //使用conn创建新的任务 16 conn *c = conn_new(item->sfd, item->init_state, item->event_flags, 17 item->read_buffer_size, item->transport, me->base); 18 if (c == NULL) { 19 if (IS_UDP(item->transport)) { 20 fprintf(stderr, "Can't listen for events on UDP socket\n"); 21 exit(1); 22 } else { 23 if (settings.verbose > 0) { 24 fprintf(stderr, "Can't listen for events on fd %d\n", 25 item->sfd); 26 } 27 close(item->sfd); 28 } 29 } else { 30 c->thread = me; 31 } 32 cqi_free(item); 33 } 34 }
使用setup_thread设置线程结构体的初始化信息之后,现在我们回到thread_init函数,thread_init中接着循环调用(循环调用nthreads次)create_worker(worker_libevent, &threads[i]); 创建真正运行的线程,create_worker是对pthread_create()简单的封装,参数worker_libevent作为每个线程的运行体,&threads[i]为传入参数。
worker_libevent为线程体,源码如下:
1 /* 2 * Worker thread: main event loop 3 */ 4 static void *worker_libevent(void *arg) { 5 LIBEVENT_THREAD *me = arg; 6 7 /* Any per-thread setup can happen here; thread_init() will block until 8 * all threads have finished initializing. 9 */ 10 pthread_mutex_lock(&init_lock); 11 init_count++; //每创建新线程,将全局init_count加1 12 pthread_cond_signal(&init_cond); // 发送init_cond信号 13 pthread_mutex_unlock(&init_lock); 14 15 //新创建线程阻塞于此,等待事件 16 event_base_loop(me->base, 0); //Libevent的事件主循环 17 return NULL; 18 }
worker_libevent中给init_count加1的目的在thread_init函数的这段代码可以看出来,
1 /* Wait for all the threads to set themselves up before returning. */ 2 pthread_mutex_lock(&init_lock); 3 while (init_count < nthreads) { 4 pthread_cond_wait(&init_cond, &init_lock); 5 } 6 pthread_mutex_unlock(&init_lock);
即主线程阻塞如此,等待worker_libevent发出的init_cond信号,唤醒后检查init_count < nthreads是否为假(即创建的线程数目是否达到要求),否则继续等待。
至此,线程池创建的代码已分析完毕,由于篇幅较长,将分析线程池中线程的调度流程另立一篇。