(转)Memcached多线程模型

原文

关键数据结构

  1. CQ_ITEM
typedef struct conn_queue_item CQ_ITEM;        
struct conn_queue_item {                
    int                     sfd;                
    enum conn_states        init_state;               
    int                     event_flags;                
    int                     read_buffer_size;                
    enum network_transport  transport;                
    CQ_ITEM                  *next;        
};

可以将这个结构体看着是主线程accept触发时即有客户端连入时,主线程写入工作线程有关socket连接相关句柄数据结构,绑定了socket描述符、状态、发生的事件、读buffer大小等,不难对号入座。

  1. CQ
typedef struct conn_queue CQ;        
struct conn_queue {            
      CQ_ITEM *head;            
      CQ_ITEM *tail;            
      pthread_mutex_t lock;            
      pthread_cond_t  cond;        
};

这是socket连接通知队列。

  1. LIBEVENT_THREAD
typedef struct {                
      pthread_t thread_id;        /* unique ID of this thread */                
      struct event_base *base;    /* libevent handle this thread uses */ 
      struct event notify_event;  /* listen event for notify pipe */
      int notify_receive_fd;      /* receiving end of notify pipe */
      int notify_send_fd;         /* sending end of notify pipe */
      struct thread_stats stats;  /* Stats generated by this thread */
      struct conn_queue *new_conn_queue; /* queue of new connections to handle */                
      cache_t *suffix_cache;      /* suffix cache */                
      uint8_t item_lock_type;     /* use fine-grained or global item lock */        
} LIBEVENT_THREAD;

可以将这个结构体看着是线程句柄数据结构,绑定了线程ID、Libevent实例、用于通知管道的event、通知接收的socket描述符,通知发送的socket描述符、socket连接通知队列,这也很好对号入座吧。

  1. conn
typedef struct conn conn;        
struct conn {            
      int    sfd;       
       ...            
      struct event event;            
      short  ev_flags;            
      short  which;   /** which events were just triggered */    
        ...            
      LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */        
};

这个结构非常庞大,提取关键的几个字段来分析一下, 可以将这个结构体看着是socket连接句柄数据结构,绑定了socket描述符、触发的事件、处理连接的线程指针等,这个也很好对号入座吧。

整体流程

《(转)Memcached多线程模型》 �整体流程

  1. 在main函数中调用main_base = event_init()来初始化主线程Libevent实例。
  2. 在main函数中调用thread_init来初始化工作线程,并将主线程Libevent实例作为参数传入。
  3. 在thread_init函数中为指定数量的工作线程分配内存,为每个线程创建管道,并分别绑定到通知收和发的socket描述符上,调用函数setup_thread初始化线程信息,调用函数create_worker为每个线程注册回调函数。关键代码:
for (i = 0; i < nthreads; i++) {
      int fds[2];
      if (pipe(fds)) {
      ...
      }

      threads[i].notify_receive_fd = fds[0];
      threads[i].notify_send_fd = fds[1];

      setup_thread(&threads[i]);
      ...
}

  // Create threads after we've done all the libevent setup. 
  for (i = 0; i < nthreads; i++) {
       create_worker(worker_libevent, &threads[i]);
  }
  1. 在setup_thread函数中,为工作线程初始化Libevent实例,为主线程通知读(notify_receive_fd)注册回调函数thread_libevent_process,初始化cq队列,关键代码如下:
static void setup_thread(LIBEVENT_THREAD *me) {
       me->base = event_init();
       ...
       /* Listen for notifications from other threads */
       event_set(&me->notify_event, me->notify_receive_fd,
                 EV_READ | EV_PERSIST, thread_libevent_process, me);
       event_base_set(me->base, &me->notify_event);

       if (event_add(&me->notify_event, 0) == -1) {
               ...
       }

       me->new_conn_queue = malloc(sizeof(struct conn_queue));
       ...
       cq_init(me->new_conn_queue);
       ...
}
  1. 在thread_libevent_process函数中,读取主线程发送的通知接收消息,将主线程accept来的fd注册到工作线程的Libevent实例中,主线程accept来的fd从conn_queue队列获取,关键代码如下:
static void thread_libevent_process(int fd, short which, void *arg) {
        LIBEVENT_THREAD *me = arg;
        CQ_ITEM *item;
        char buf[1];

        if (read(fd, buf, 1) != 1)
                ...

        switch (buf[0]) {
        case 'c':
        item = cq_pop(me->new_conn_queue);

        if (NULL != item) {
                conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                                   item->read_buffer_size, item->transport, me->base);
       ...
        }
}
  1. 在函数conn_new中,创建conn句柄,为句柄注册回调函数event_handler处理事件,将该句柄作为参数传入回调函数并设置到Libevent中,该函数的关键代码如下:
conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, 
                enum network_transport transport,
                struct event_base *base) {
      conn *c = conn_from_freelist();

      if (NULL == c) {
        if (!(c = (conn *)calloc(1, sizeof(conn)))) {
                ...
        }
        ...
        event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
        event_base_set(base, &c->event);
        c->ev_flags = event_flags;

        if (event_add(&c->event, 0) == -1) {
              ...
        }
        ...
      }
}
  1. 在create_worker函数中,创建工作线程并注册回调函数,在工作线程的回调函数work_libevent中,开始Libevent主循环。
  2. 在main函数中,调用函数server_sockets,再调用函数server_socket,进而调用函数new_socket,在调用函数conn_new,创建并注册listen fd到主线程Libevent实例上,最后开始Libevent主循环即event_base_loop。在conn_new函数关键代码见步骤(6)
  3. 在event_handler函数中,调用函数drive_machine,在该函数中处理所有事件,其关键代码如下:
static void drive_machine(conn *c) {
        ...
        while (!stop) {
            switch(c->state) {
                case conn_listening:
                        addrlen = sizeof(addr);
                        if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
                               ...                                       
                        }
                       ...
                       if (settings.maxconns_fast &&
                            stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                                ...
                       } else {
                            dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
                      }

                    stop = true;
                    break;
                    ...
                }
            }

        return;
}

在处理事件时,如果是listening事件,则调用函数dispatch_conn_new将accept fd分配给工作线程。

  1. 在dispatch_conn_new函数中,根据round-robin算法将新连接push到所分配线程的CQ队列中,并通过管道发送通知消息“c”,关键代码如下:
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
        CQ_ITEM *item = cqi_new();
        char buf[1];
        int tid = (last_thread + 1) % settings.num_threads;

        LIBEVENT_THREAD *thread = threads + tid;

        last_thread = tid;

        ...

        cq_push(thread->new_conn_queue, item);
        ...
        buf[0] = 'c';
        if (write(thread->notify_send_fd, buf, 1) != 1) {
            perror("Writing to thread notify pipe");
        }
}

dispatch_conn_new函数只在主线程中调用,last_thread为静态变量,每次将该变量值+1,再模线程数来选择工作线程。

线程模型

Libevent本身是单线程的,Memcached采用消息通知+同步层机制使得其支持多线程,整体模型见如下神图:

《(转)Memcached多线程模型》 �线程模型

每个线程包括主线程都各自有独立的Libevent实例,Memcached的listen fd注册到主线程的Libevent实例上,由主线程来accept新的连接,接受新的连接后根据Round-robin算法选择工作线程,将新连接的socket fd封装为CQ_ITEM后push到所选工作线程的CQ队列中,然后主线程(notify_send_fd)通过管道发送字符“c”到工作线程(notify_receive_fd),而notify_receive_fd已经注册到工作线程的Libevent实例上了,这样工作线程就能收到通知“c”,然后从该工作线程的CQ队列中pop出CQ_ITEM进而取出新连接并将fd注册到工作线程的Libevent实例上,从而由工作线程来处理该连接的所有后续事件。 需要注意的数据:Memcached默认开启线程数为4,也可以通过参数-t来指定开启线程数,当线程数大于64时会给出错误提示,建议线程数为小于或等于CPU核数。

    原文作者:lcode
    原文地址: https://www.jianshu.com/p/98a02d771b2d
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞