ZooKeeper C client 源代码阅读笔记

整体感觉,写得比较乱,风格不是特别统一,一个函数出口也有多个,没有用好goto来做错误处理,注释也比较少,阅读起来不是特别舒服,感觉挺乱的。
需要对Linux系统调用比较了解,另外还需要了解poll的通信机制。

为了简单起见,去掉了Windows相关的宏定义及其代码。

Zookeeper c client笔记

handler表示一个客户端连接,源代码位置zk_adoaptor.h:

struct _zhandle {
    int fd; /* the descriptor used to talk to zookeeper */
    char *hostname; /* the hostname of zookeeper */
    struct sockaddr_storage *addrs; /* the addresses that correspond to the hostname */
    int addrs_count; /* The number of addresses in the addrs array */
    watcher_fn watcher; /* the registered watcher */
    struct timeval last_recv; /* The time that the last message was received */
    struct timeval last_send; /* The time that the last message was sent */
    struct timeval last_ping; /* The time that the last PING was sent */
    struct timeval next_deadline; /* The time of the next deadline */
    int recv_timeout; /* The maximum amount of time that can go by without 
     receiving anything from the zookeeper server */
    buffer_list_t *input_buffer; /* the current buffer being read in */
    buffer_head_t to_process; /* The buffers that have been read and are ready to be processed. */
    buffer_head_t to_send; /* The packets queued to send */
    completion_head_t sent_requests; /* The outstanding requests */
    completion_head_t completions_to_process; /* completions that are ready to run */
    int connect_index; /* The index of the address to connect to */
    clientid_t client_id;
    long long last_zxid;
    int outstanding_sync; /* Number of outstanding synchronous requests */
    struct _buffer_list primer_buffer; /* The buffer used for the handshake at the start of a connection */
    struct prime_struct primer_storage; /* the connect response */
    char primer_storage_buffer[40]; /* the true size of primer_storage */
    volatile int state;
    void *context;
    auth_list_head_t auth_h; /* authentication data list */
    /* zookeeper_close is not reentrant because it de-allocates the zhandler. 
     * This guard variable is used to defer the destruction of zhandle till 
     * right before top-level API call returns to the caller */
    int32_t ref_counter;
    volatile int close_requested;
    void *adaptor_priv;
    /* Used for debugging only: non-zero value indicates the time when the zookeeper_process
     * call returned while there was at least one unprocessed server response 
     * available in the socket recv buffer */
    struct timeval socket_readable;
    
    zk_hashtable* active_node_watchers;   
    zk_hashtable* active_exist_watchers;
    zk_hashtable* active_child_watchers;
    /** used for chroot path at the client side **/
    char *chroot;
};

ZooKeeper client初始化的时候,会开两个线程,一个叫IO线程,一个叫completion线程。IO线程的执行函数是do_io,位于mt_adaptor.c。

IO线程

do_io的源代码如下(将windows的版本相关的代码去掉了):


void *do_io(void *v)
{
    zhandle_t *zh = (zhandle_t*)v;
    struct pollfd fds[2];
    struct adaptor_threads *adaptor_threads = zh->adaptor_priv;

    api_prolog(zh);
    notify_thread_ready(zh);
    LOG_DEBUG(("started IO thread"));
    fds[0].fd=adaptor_threads->self_pipe[0];
    fds[0].events=POLLIN;
    while(!zh->close_requested) {
        struct timeval tv;
        int fd;
        int interest;
        int timeout;
        int maxfd=1;

        zookeeper_interest(zh, &fd, &interest, &tv);
        if (fd != -1) {
            fds[1].fd=fd;
            fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
            fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
            maxfd=2;
        }
        timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);
        
        poll(fds,maxfd,timeout);
        if (fd != -1) {
            interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
            interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
        }
        if(fds[0].revents&POLLIN){
            // flush the pipe
            char b[128];
            while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
        }        
        // dispatch zookeeper events
        zookeeper_process(zh, interest);
        // check the current state of the zhandle and terminate 
        // if it is_unrecoverable()
        if(is_unrecoverable(zh))
            break;
    }
    api_epilog(zh, 0);    
    LOG_DEBUG(("IO thread terminated"));
    return 0;
}

zookeeper_process

int zookeeper_process(zhandle_t *zh, int events)
{
    buffer_list_t *bptr;
    int rc;

    if (zh==NULL)
        return ZBADARGUMENTS;
    if (is_unrecoverable(zh))
        return ZINVALIDSTATE;
    api_prolog(zh);
    IF_DEBUG(checkResponseLatency(zh));
    // 视zk的状态和events,读取数据或者发送数据
    // 如果是初次连接,会发送create session的请求
    // 如果接收到create session的ack,将状态置为connected
    // 接收到的其它ack会被放到zk->to_process中,等待后续处理
    rc = check_events(zh, events);
    if (rc!=ZOK)
        return api_epilog(zh, rc);

    IF_DEBUG(isSocketReadable(zh));

    while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
        struct ReplyHeader hdr;
        struct iarchive *ia = create_buffer_iarchive(
                                    bptr->buffer, bptr->curr_offset);
        deserialize_ReplyHeader(ia, "hdr", &hdr);
        if (hdr.zxid > 0) {
            zh->last_zxid = hdr.zxid;
        } else {
            // fprintf(stderr, "Got %#x for %#x\n", hdr.zxid, hdr.xid);
        }

        if (hdr.xid == PING_XID) {
            // Ping replies can arrive out-of-order
            int elapsed = 0;
            struct timeval now;
            gettimeofday(&now, 0);
            elapsed = calculate_interval(&zh->last_ping, &now);
            LOG_DEBUG(("Got ping response in %d ms", elapsed));
            free_buffer(bptr);
        } else if (hdr.xid == WATCHER_EVENT_XID) {
            struct WatcherEvent evt;
            int type = 0;
            char *path = NULL;
            completion_list_t *c = NULL;

            LOG_DEBUG(("Processing WATCHER_EVENT"));

            deserialize_WatcherEvent(ia, "event", &evt);
            type = evt.type;
            path = evt.path;
            /* We are doing a notification, so there is no pending request */
            c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
            c->buffer = bptr;
            c->c.watcher_result = collectWatchers(zh, type, path);

            // We cannot free until now, otherwise path will become invalid
            deallocate_WatcherEvent(&evt);
            queue_completion(&zh->completions_to_process, c, 0);
        } else if (hdr.xid == SET_WATCHES_XID) {
            LOG_DEBUG(("Processing SET_WATCHES"));
            free_buffer(bptr);
        } else if (hdr.xid == AUTH_XID){
            LOG_DEBUG(("Processing AUTH_XID"));

            /* special handling for the AUTH response as it may come back
             * out-of-band */
            auth_completion_func(hdr.err,zh);
            free_buffer(bptr);
            /* authentication completion may change the connection state to
             * unrecoverable */
            if(is_unrecoverable(zh)){
                handle_error(zh, ZAUTHFAILED);
                close_buffer_iarchive(&ia);
                return api_epilog(zh, ZAUTHFAILED);
            }
        } else {
            int rc = hdr.err;
            /* Find the request corresponding to the response */
            completion_list_t *cptr = dequeue_completion(&zh->sent_requests);

            /* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
            if (zh->close_requested == 1 && cptr == NULL) {
                LOG_DEBUG(("Completion queue has been cleared by zookeeper_close()"));
                close_buffer_iarchive(&ia);
                free_buffer(bptr);
                return api_epilog(zh,ZINVALIDSTATE);
            }
            assert(cptr);
            /* The requests are going to come back in order */
            if (cptr->xid != hdr.xid) {
                LOG_DEBUG(("Processing unexpected or out-of-order response!"));

                // received unexpected (or out-of-order) response
                close_buffer_iarchive(&ia);
                free_buffer(bptr);
                // put the completion back on the queue (so it gets properly
                // signaled and deallocated) and disconnect from the server
                queue_completion(&zh->sent_requests,cptr,1);
                return api_epilog(zh,
                                  handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
                                  "unexpected server response: expected %#x, but received %#x",
                                  hdr.xid,cptr->xid));
            }

            activateWatcher(zh, cptr->watcher, rc);
            // 对于异步调用的处理
            if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
                LOG_DEBUG(("Queueing asynchronous response"));
                cptr->buffer = bptr;
                queue_completion(&zh->completions_to_process, cptr, 0);
            } else {
                // 对于同步调用的处理
                struct sync_completion
                        *sc = (struct sync_completion*)cptr->data;
                sc->rc = rc;
                
                process_sync_completion(cptr, sc, ia, zh); 
                
                notify_sync_completion(sc);
                free_buffer(bptr);
                zh->outstanding_sync--;
                destroy_completion_entry(cptr);
            }
        }

        close_buffer_iarchive(&ia);

    }
    if (process_async(zh->outstanding_sync)) {
        process_completions(zh);
    }
    return api_epilog(zh,ZOK);
}

check_event:

// 如果尚未建立连接,发送建立连接的数据
// 如果有待发送的数据,发送数据
// 如果有待读取的数据,读取并处理
// 感觉叫check_event好不合理
static int check_events(zhandle_t *zh, int events)
{
    if (zh->fd == -1)
        return ZINVALIDSTATE;
    // 正在连接状态
    if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) {
        int rc, error;
        socklen_t len = sizeof(error);
        rc = getsockopt(zh->fd, SOL_SOCKET, SO_ERROR, &error, &len);
        /* the description in section 16.4 "Non-blocking connect"
         * in UNIX Network Programming vol 1, 3rd edition, points out
         * that sometimes the error is in errno and sometimes in error */
        if (rc < 0 || error) {
            if (rc == 0)
                errno = error;
            return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
                "server refused to accept the client");
        }
        if((rc=prime_connection(zh))!=0)
            return rc;
        LOG_INFO(("initiated connection to server [%s]",
                format_endpoint_info(&zh->addrs[zh->connect_index])));
        return ZOK;
    }
    if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
        /* make the flush call non-blocking by specifying a 0 timeout */
        int rc=flush_send_queue(zh,0);
        if (rc < 0)
            return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS,
                "failed while flushing send queue");
    }
    if (events&ZOOKEEPER_READ) {
        int rc;
        if (zh->input_buffer == 0) {
            zh->input_buffer = allocate_buffer(0,0);
        }
        // 读取数据,返回值-1表示失败,0表示读取阻塞,1表示读取成功
        rc = recv_buffer(zh->fd, zh->input_buffer);
        if (rc < 0) {
            return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
                "failed while receiving a server response");
        }
        if (rc > 0) {
            gettimeofday(&zh->last_recv, 0);
            // 如果不是建立连接返回的数据,就放到队列里
            if (zh->input_buffer != &zh->primer_buffer) {
                queue_buffer(&zh->to_process, zh->input_buffer, 0);
            } else  { // 建立连接返回的数据
                int64_t oldid,newid;
                // 反序列化接收到的数据(zh->primer_buffer.buffer)到zh->primer_storage
                deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
                /* We are processing the primer_buffer, so we need to finish
                 * the connection handshake */
                oldid = zh->client_id.client_id;
                newid = zh->primer_storage.sessionId;
                // 发送前后的session id不一样
                if (oldid != 0 && oldid != newid) {
                    zh->state = ZOO_EXPIRED_SESSION_STATE;
                    errno = ESTALE;
                    return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED,
                            "sessionId=%#llx has expired.",oldid);
                } else {
                    zh->recv_timeout = zh->primer_storage.timeOut;
                    zh->client_id.client_id = newid;
                 
                    memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
                           sizeof(zh->client_id.passwd));
                    // 将状态设置为connected
                    zh->state = ZOO_CONNECTED_STATE;
                    LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
                              format_endpoint_info(&zh->addrs[zh->connect_index]),
                              newid, zh->recv_timeout));
                    /* we want the auth to be sent for, but since both call push to front
                       we need to call send_watch_set first */
                    send_set_watches(zh);
                    /* send the authentication packet now */
                    send_auth_info(zh);
                    LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE"));
                    zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
                    PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
                }
            }
            zh->input_buffer = 0;
        } else {
            // zookeeper_process was called but there was nothing to read
            // from the socket
            return ZNOTHING;
        }
    }
    return ZOK;
}

prime_connection负责第一次和ZooKeeper建立连接:

static int prime_connection(zhandle_t *zh)
{
    int rc;
    /*this is the size of buffer to serialize req into*/
    char buffer_req[HANDSHAKE_REQ_SIZE];
    int len = sizeof(buffer_req);
    int hlen = 0;
    // 建立连接所需要的req
    struct connect_req req;
    req.protocolVersion = 0;
    req.sessionId = zh->client_id.client_id;
    req.passwd_len = sizeof(req.passwd);
    memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
    req.timeOut = zh->recv_timeout;
    req.lastZxidSeen = zh->last_zxid;
    hlen = htonl(len);
    /* We are running fast and loose here, but this string should fit in the initial buffer! */
    // 发送长度
    rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
    // 序列化建立连接所需要的请求数据
    serialize_prime_connect(&req, buffer_req);
    // 发送建立连接所需要的数据
    rc=rc<0 ? rc : zookeeper_send(zh->fd, buffer_req, len);
    if (rc<0) {
        return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS,
                "failed to send a handshake packet: %s", strerror(errno));
    }
    // 把zk client的状态改为ASSOCIATION
    zh->state = ZOO_ASSOCIATING_STATE;

    zh->input_buffer = &zh->primer_buffer;
    /* This seems a bit weird to to set the offset to 4, but we already have a
     * length, so we skip reading the length (and allocating the buffer) by
     * saying that we are already at offset 4 */
    zh->input_buffer->curr_offset = 4;

    return ZOK;
}

completion线程

completion线程总体上比IO线程的代码要简单很多,其线程的入口函数如下:

void *do_completion(void *v)
{
    zhandle_t *zh = v;
    api_prolog(zh);
    notify_thread_ready(zh);
    LOG_DEBUG(("started completion thread"));
    while(!zh->close_requested) {
        pthread_mutex_lock(&zh->completions_to_process.lock);
        while(!zh->completions_to_process.head && !zh->close_requested) {
            pthread_cond_wait(&zh->completions_to_process.cond, &zh->completions_to_process.lock);
        }
        pthread_mutex_unlock(&zh->completions_to_process.lock);
        process_completions(zh);
    }
    api_epilog(zh, 0);    
    LOG_DEBUG(("completion thread terminated"));
    return 0;
}

就是只要completions_to_process队列不为空,就调用process_completions处理一下。接下来看一下process_completions是怎么实现的:

void process_completions(zhandle_t *zh)
{
    completion_list_t *cptr;
    while ((cptr = dequeue_completion(&zh->completions_to_process)) != 0) {
        struct ReplyHeader hdr;
        buffer_list_t *bptr = cptr->buffer;
        struct iarchive *ia = create_buffer_iarchive(bptr->buffer,
                bptr->len);
        deserialize_ReplyHeader(ia, "hdr", &hdr);

        if (hdr.xid == WATCHER_EVENT_XID) {
            int type, state;
            struct WatcherEvent evt;
            deserialize_WatcherEvent(ia, "event", &evt);
            /* We are doing a notification, so there is no pending request */
            type = evt.type;
            state = evt.state;
            /* This is a notification so there aren't any pending requests */
            LOG_DEBUG(("Calling a watcher for node [%s], type = %d event=%s",
                       (evt.path==NULL?"NULL":evt.path), cptr->c.type,
                       watcherEvent2String(type)));
            deliverWatchers(zh,type,state,evt.path, &cptr->c.watcher_result);
            deallocate_WatcherEvent(&evt);
        } else {
            // 主要逻辑在这里
            deserialize_response(cptr->c.type, hdr.xid, hdr.err != 0, hdr.err, cptr, ia);
        }
        destroy_completion_entry(cptr);
        close_buffer_iarchive(&ia);
    }
}


deserialize_response的逻辑也比较直接,根据不同的请求类型,调用不同的回调。

static void deserialize_response(int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia)
{
    switch (type) {
    case COMPLETION_DATA:
        LOG_DEBUG(("Calling COMPLETION_DATA for xid=%#x failed=%d rc=%d",
                    cptr->xid, failed, rc));
        if (failed) {
            cptr->c.data_result(rc, 0, 0, 0, cptr->data);
        } else {
            struct GetDataResponse res;
            deserialize_GetDataResponse(ia, "reply", &res);
            cptr->c.data_result(rc, res.data.buff, res.data.len,
                    &res.stat, cptr->data);
            deallocate_GetDataResponse(&res);
        }
        break;
    case COMPLETION_STAT:
        LOG_DEBUG(("Calling COMPLETION_STAT for xid=%#x failed=%d rc=%d",
                    cptr->xid, failed, rc));
        if (failed) {
            cptr->c.stat_result(rc, 0, cptr->data);
        } else {
            struct SetDataResponse res;
            deserialize_SetDataResponse(ia, "reply", &res);
            cptr->c.stat_result(rc, &res.stat, cptr->data);
            deallocate_SetDataResponse(&res);
        }
        break;
    case COMPLETION_STRINGLIST:
        LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%#x failed=%d rc=%d",
                    cptr->xid, failed, rc));
        if (failed) {
            cptr->c.strings_result(rc, 0, cptr->data);
        } else {
            struct GetChildrenResponse res;
            deserialize_GetChildrenResponse(ia, "reply", &res);
            cptr->c.strings_result(rc, &res.children, cptr->data);
            deallocate_GetChildrenResponse(&res);
        }
        break;
    case COMPLETION_STRINGLIST_STAT:
        LOG_DEBUG(("Calling COMPLETION_STRINGLIST_STAT for xid=%#x failed=%d rc=%d",
                    cptr->xid, failed, rc));
        if (failed) {
            cptr->c.strings_stat_result(rc, 0, 0, cptr->data);
        } else {
            struct GetChildren2Response res;
            deserialize_GetChildren2Response(ia, "reply", &res);
            cptr->c.strings_stat_result(rc, &res.children, &res.stat, cptr->data);
            deallocate_GetChildren2Response(&res);
        }
        break;
    case COMPLETION_STRING:
        LOG_DEBUG(("Calling COMPLETION_STRING for xid=%#x failed=%d, rc=%d",
                    cptr->xid, failed, rc));
        if (failed) {
            cptr->c.string_result(rc, 0, cptr->data);
        } else {
            struct CreateResponse res;
            memset(&res, 0, sizeof(res));
            deserialize_CreateResponse(ia, "reply", &res);
            cptr->c.string_result(rc, res.path, cptr->data);
            deallocate_CreateResponse(&res);
        }
        break;
    case COMPLETION_ACLLIST:
        LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%#x failed=%d rc=%d",
                    cptr->xid, failed, rc));
        if (failed) {
            cptr->c.acl_result(rc, 0, 0, cptr->data);
        } else {
            struct GetACLResponse res;
            deserialize_GetACLResponse(ia, "reply", &res);
            cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
            deallocate_GetACLResponse(&res);
        }
        break;
    case COMPLETION_VOID:
        LOG_DEBUG(("Calling COMPLETION_VOID for xid=%#x failed=%d rc=%d",
                    cptr->xid, failed, rc));
        assert(cptr->c.void_result);
        cptr->c.void_result(rc, cptr->data);
        break;
    case COMPLETION_MULTI:
        LOG_DEBUG(("Calling COMPLETION_MULTI for xid=%#x failed=%d rc=%d",
                    cptr->xid, failed, rc));
        rc = deserialize_multi(xid, cptr, ia);
        assert(cptr->c.void_result);
        cptr->c.void_result(rc, cptr->data);
        break;
    default:
        LOG_DEBUG(("Unsupported completion type=%d", cptr->c.type));
    }
}

读写数据

使用ZooKeeper API读写数据时,其实就是构造completion_list_t,添加到struct _zhandle里的sent_requests队列里,然后等待上面的IO线程把数据发送出去。等待数据返回后,找到相应的completion_list_t,然后调用回调。

先看到这里吧,日后有需求再深入看一下。

    原文作者:Jiang阿涵
    原文地址: https://www.jianshu.com/p/03067f6c6546
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞