整体感觉,写得比较乱,风格不是特别统一,一个函数出口也有多个,没有用好goto来做错误处理,注释也比较少,阅读起来不是特别舒服,感觉挺乱的。
需要对Linux系统调用比较了解,另外还需要了解poll的通信机制。
为了简单起见,去掉了Windows相关的宏定义及其代码。
Zookeeper c client笔记
handler表示一个客户端连接,源代码位置zk_adoaptor.h:
struct _zhandle {
int fd; /* the descriptor used to talk to zookeeper */
char *hostname; /* the hostname of zookeeper */
struct sockaddr_storage *addrs; /* the addresses that correspond to the hostname */
int addrs_count; /* The number of addresses in the addrs array */
watcher_fn watcher; /* the registered watcher */
struct timeval last_recv; /* The time that the last message was received */
struct timeval last_send; /* The time that the last message was sent */
struct timeval last_ping; /* The time that the last PING was sent */
struct timeval next_deadline; /* The time of the next deadline */
int recv_timeout; /* The maximum amount of time that can go by without
receiving anything from the zookeeper server */
buffer_list_t *input_buffer; /* the current buffer being read in */
buffer_head_t to_process; /* The buffers that have been read and are ready to be processed. */
buffer_head_t to_send; /* The packets queued to send */
completion_head_t sent_requests; /* The outstanding requests */
completion_head_t completions_to_process; /* completions that are ready to run */
int connect_index; /* The index of the address to connect to */
clientid_t client_id;
long long last_zxid;
int outstanding_sync; /* Number of outstanding synchronous requests */
struct _buffer_list primer_buffer; /* The buffer used for the handshake at the start of a connection */
struct prime_struct primer_storage; /* the connect response */
char primer_storage_buffer[40]; /* the true size of primer_storage */
volatile int state;
void *context;
auth_list_head_t auth_h; /* authentication data list */
/* zookeeper_close is not reentrant because it de-allocates the zhandler.
* This guard variable is used to defer the destruction of zhandle till
* right before top-level API call returns to the caller */
int32_t ref_counter;
volatile int close_requested;
void *adaptor_priv;
/* Used for debugging only: non-zero value indicates the time when the zookeeper_process
* call returned while there was at least one unprocessed server response
* available in the socket recv buffer */
struct timeval socket_readable;
zk_hashtable* active_node_watchers;
zk_hashtable* active_exist_watchers;
zk_hashtable* active_child_watchers;
/** used for chroot path at the client side **/
char *chroot;
};
ZooKeeper client初始化的时候,会开两个线程,一个叫IO线程,一个叫completion线程。IO线程的执行函数是do_io,位于mt_adaptor.c。
IO线程
do_io的源代码如下(将windows的版本相关的代码去掉了):
void *do_io(void *v)
{
zhandle_t *zh = (zhandle_t*)v;
struct pollfd fds[2];
struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
api_prolog(zh);
notify_thread_ready(zh);
LOG_DEBUG(("started IO thread"));
fds[0].fd=adaptor_threads->self_pipe[0];
fds[0].events=POLLIN;
while(!zh->close_requested) {
struct timeval tv;
int fd;
int interest;
int timeout;
int maxfd=1;
zookeeper_interest(zh, &fd, &interest, &tv);
if (fd != -1) {
fds[1].fd=fd;
fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
maxfd=2;
}
timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);
poll(fds,maxfd,timeout);
if (fd != -1) {
interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
}
if(fds[0].revents&POLLIN){
// flush the pipe
char b[128];
while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
}
// dispatch zookeeper events
zookeeper_process(zh, interest);
// check the current state of the zhandle and terminate
// if it is_unrecoverable()
if(is_unrecoverable(zh))
break;
}
api_epilog(zh, 0);
LOG_DEBUG(("IO thread terminated"));
return 0;
}
zookeeper_process
int zookeeper_process(zhandle_t *zh, int events)
{
buffer_list_t *bptr;
int rc;
if (zh==NULL)
return ZBADARGUMENTS;
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
api_prolog(zh);
IF_DEBUG(checkResponseLatency(zh));
// 视zk的状态和events,读取数据或者发送数据
// 如果是初次连接,会发送create session的请求
// 如果接收到create session的ack,将状态置为connected
// 接收到的其它ack会被放到zk->to_process中,等待后续处理
rc = check_events(zh, events);
if (rc!=ZOK)
return api_epilog(zh, rc);
IF_DEBUG(isSocketReadable(zh));
while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
struct ReplyHeader hdr;
struct iarchive *ia = create_buffer_iarchive(
bptr->buffer, bptr->curr_offset);
deserialize_ReplyHeader(ia, "hdr", &hdr);
if (hdr.zxid > 0) {
zh->last_zxid = hdr.zxid;
} else {
// fprintf(stderr, "Got %#x for %#x\n", hdr.zxid, hdr.xid);
}
if (hdr.xid == PING_XID) {
// Ping replies can arrive out-of-order
int elapsed = 0;
struct timeval now;
gettimeofday(&now, 0);
elapsed = calculate_interval(&zh->last_ping, &now);
LOG_DEBUG(("Got ping response in %d ms", elapsed));
free_buffer(bptr);
} else if (hdr.xid == WATCHER_EVENT_XID) {
struct WatcherEvent evt;
int type = 0;
char *path = NULL;
completion_list_t *c = NULL;
LOG_DEBUG(("Processing WATCHER_EVENT"));
deserialize_WatcherEvent(ia, "event", &evt);
type = evt.type;
path = evt.path;
/* We are doing a notification, so there is no pending request */
c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
c->buffer = bptr;
c->c.watcher_result = collectWatchers(zh, type, path);
// We cannot free until now, otherwise path will become invalid
deallocate_WatcherEvent(&evt);
queue_completion(&zh->completions_to_process, c, 0);
} else if (hdr.xid == SET_WATCHES_XID) {
LOG_DEBUG(("Processing SET_WATCHES"));
free_buffer(bptr);
} else if (hdr.xid == AUTH_XID){
LOG_DEBUG(("Processing AUTH_XID"));
/* special handling for the AUTH response as it may come back
* out-of-band */
auth_completion_func(hdr.err,zh);
free_buffer(bptr);
/* authentication completion may change the connection state to
* unrecoverable */
if(is_unrecoverable(zh)){
handle_error(zh, ZAUTHFAILED);
close_buffer_iarchive(&ia);
return api_epilog(zh, ZAUTHFAILED);
}
} else {
int rc = hdr.err;
/* Find the request corresponding to the response */
completion_list_t *cptr = dequeue_completion(&zh->sent_requests);
/* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
if (zh->close_requested == 1 && cptr == NULL) {
LOG_DEBUG(("Completion queue has been cleared by zookeeper_close()"));
close_buffer_iarchive(&ia);
free_buffer(bptr);
return api_epilog(zh,ZINVALIDSTATE);
}
assert(cptr);
/* The requests are going to come back in order */
if (cptr->xid != hdr.xid) {
LOG_DEBUG(("Processing unexpected or out-of-order response!"));
// received unexpected (or out-of-order) response
close_buffer_iarchive(&ia);
free_buffer(bptr);
// put the completion back on the queue (so it gets properly
// signaled and deallocated) and disconnect from the server
queue_completion(&zh->sent_requests,cptr,1);
return api_epilog(zh,
handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
"unexpected server response: expected %#x, but received %#x",
hdr.xid,cptr->xid));
}
activateWatcher(zh, cptr->watcher, rc);
// 对于异步调用的处理
if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
LOG_DEBUG(("Queueing asynchronous response"));
cptr->buffer = bptr;
queue_completion(&zh->completions_to_process, cptr, 0);
} else {
// 对于同步调用的处理
struct sync_completion
*sc = (struct sync_completion*)cptr->data;
sc->rc = rc;
process_sync_completion(cptr, sc, ia, zh);
notify_sync_completion(sc);
free_buffer(bptr);
zh->outstanding_sync--;
destroy_completion_entry(cptr);
}
}
close_buffer_iarchive(&ia);
}
if (process_async(zh->outstanding_sync)) {
process_completions(zh);
}
return api_epilog(zh,ZOK);
}
check_event:
// 如果尚未建立连接,发送建立连接的数据
// 如果有待发送的数据,发送数据
// 如果有待读取的数据,读取并处理
// 感觉叫check_event好不合理
static int check_events(zhandle_t *zh, int events)
{
if (zh->fd == -1)
return ZINVALIDSTATE;
// 正在连接状态
if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) {
int rc, error;
socklen_t len = sizeof(error);
rc = getsockopt(zh->fd, SOL_SOCKET, SO_ERROR, &error, &len);
/* the description in section 16.4 "Non-blocking connect"
* in UNIX Network Programming vol 1, 3rd edition, points out
* that sometimes the error is in errno and sometimes in error */
if (rc < 0 || error) {
if (rc == 0)
errno = error;
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
"server refused to accept the client");
}
if((rc=prime_connection(zh))!=0)
return rc;
LOG_INFO(("initiated connection to server [%s]",
format_endpoint_info(&zh->addrs[zh->connect_index])));
return ZOK;
}
if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
/* make the flush call non-blocking by specifying a 0 timeout */
int rc=flush_send_queue(zh,0);
if (rc < 0)
return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS,
"failed while flushing send queue");
}
if (events&ZOOKEEPER_READ) {
int rc;
if (zh->input_buffer == 0) {
zh->input_buffer = allocate_buffer(0,0);
}
// 读取数据,返回值-1表示失败,0表示读取阻塞,1表示读取成功
rc = recv_buffer(zh->fd, zh->input_buffer);
if (rc < 0) {
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
"failed while receiving a server response");
}
if (rc > 0) {
gettimeofday(&zh->last_recv, 0);
// 如果不是建立连接返回的数据,就放到队列里
if (zh->input_buffer != &zh->primer_buffer) {
queue_buffer(&zh->to_process, zh->input_buffer, 0);
} else { // 建立连接返回的数据
int64_t oldid,newid;
// 反序列化接收到的数据(zh->primer_buffer.buffer)到zh->primer_storage
deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
/* We are processing the primer_buffer, so we need to finish
* the connection handshake */
oldid = zh->client_id.client_id;
newid = zh->primer_storage.sessionId;
// 发送前后的session id不一样
if (oldid != 0 && oldid != newid) {
zh->state = ZOO_EXPIRED_SESSION_STATE;
errno = ESTALE;
return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED,
"sessionId=%#llx has expired.",oldid);
} else {
zh->recv_timeout = zh->primer_storage.timeOut;
zh->client_id.client_id = newid;
memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
sizeof(zh->client_id.passwd));
// 将状态设置为connected
zh->state = ZOO_CONNECTED_STATE;
LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
format_endpoint_info(&zh->addrs[zh->connect_index]),
newid, zh->recv_timeout));
/* we want the auth to be sent for, but since both call push to front
we need to call send_watch_set first */
send_set_watches(zh);
/* send the authentication packet now */
send_auth_info(zh);
LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE"));
zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
}
}
zh->input_buffer = 0;
} else {
// zookeeper_process was called but there was nothing to read
// from the socket
return ZNOTHING;
}
}
return ZOK;
}
prime_connection负责第一次和ZooKeeper建立连接:
static int prime_connection(zhandle_t *zh)
{
int rc;
/*this is the size of buffer to serialize req into*/
char buffer_req[HANDSHAKE_REQ_SIZE];
int len = sizeof(buffer_req);
int hlen = 0;
// 建立连接所需要的req
struct connect_req req;
req.protocolVersion = 0;
req.sessionId = zh->client_id.client_id;
req.passwd_len = sizeof(req.passwd);
memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
req.timeOut = zh->recv_timeout;
req.lastZxidSeen = zh->last_zxid;
hlen = htonl(len);
/* We are running fast and loose here, but this string should fit in the initial buffer! */
// 发送长度
rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
// 序列化建立连接所需要的请求数据
serialize_prime_connect(&req, buffer_req);
// 发送建立连接所需要的数据
rc=rc<0 ? rc : zookeeper_send(zh->fd, buffer_req, len);
if (rc<0) {
return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS,
"failed to send a handshake packet: %s", strerror(errno));
}
// 把zk client的状态改为ASSOCIATION
zh->state = ZOO_ASSOCIATING_STATE;
zh->input_buffer = &zh->primer_buffer;
/* This seems a bit weird to to set the offset to 4, but we already have a
* length, so we skip reading the length (and allocating the buffer) by
* saying that we are already at offset 4 */
zh->input_buffer->curr_offset = 4;
return ZOK;
}
completion线程
completion线程总体上比IO线程的代码要简单很多,其线程的入口函数如下:
void *do_completion(void *v)
{
zhandle_t *zh = v;
api_prolog(zh);
notify_thread_ready(zh);
LOG_DEBUG(("started completion thread"));
while(!zh->close_requested) {
pthread_mutex_lock(&zh->completions_to_process.lock);
while(!zh->completions_to_process.head && !zh->close_requested) {
pthread_cond_wait(&zh->completions_to_process.cond, &zh->completions_to_process.lock);
}
pthread_mutex_unlock(&zh->completions_to_process.lock);
process_completions(zh);
}
api_epilog(zh, 0);
LOG_DEBUG(("completion thread terminated"));
return 0;
}
就是只要completions_to_process队列不为空,就调用process_completions处理一下。接下来看一下process_completions是怎么实现的:
void process_completions(zhandle_t *zh)
{
completion_list_t *cptr;
while ((cptr = dequeue_completion(&zh->completions_to_process)) != 0) {
struct ReplyHeader hdr;
buffer_list_t *bptr = cptr->buffer;
struct iarchive *ia = create_buffer_iarchive(bptr->buffer,
bptr->len);
deserialize_ReplyHeader(ia, "hdr", &hdr);
if (hdr.xid == WATCHER_EVENT_XID) {
int type, state;
struct WatcherEvent evt;
deserialize_WatcherEvent(ia, "event", &evt);
/* We are doing a notification, so there is no pending request */
type = evt.type;
state = evt.state;
/* This is a notification so there aren't any pending requests */
LOG_DEBUG(("Calling a watcher for node [%s], type = %d event=%s",
(evt.path==NULL?"NULL":evt.path), cptr->c.type,
watcherEvent2String(type)));
deliverWatchers(zh,type,state,evt.path, &cptr->c.watcher_result);
deallocate_WatcherEvent(&evt);
} else {
// 主要逻辑在这里
deserialize_response(cptr->c.type, hdr.xid, hdr.err != 0, hdr.err, cptr, ia);
}
destroy_completion_entry(cptr);
close_buffer_iarchive(&ia);
}
}
deserialize_response的逻辑也比较直接,根据不同的请求类型,调用不同的回调。
static void deserialize_response(int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia)
{
switch (type) {
case COMPLETION_DATA:
LOG_DEBUG(("Calling COMPLETION_DATA for xid=%#x failed=%d rc=%d",
cptr->xid, failed, rc));
if (failed) {
cptr->c.data_result(rc, 0, 0, 0, cptr->data);
} else {
struct GetDataResponse res;
deserialize_GetDataResponse(ia, "reply", &res);
cptr->c.data_result(rc, res.data.buff, res.data.len,
&res.stat, cptr->data);
deallocate_GetDataResponse(&res);
}
break;
case COMPLETION_STAT:
LOG_DEBUG(("Calling COMPLETION_STAT for xid=%#x failed=%d rc=%d",
cptr->xid, failed, rc));
if (failed) {
cptr->c.stat_result(rc, 0, cptr->data);
} else {
struct SetDataResponse res;
deserialize_SetDataResponse(ia, "reply", &res);
cptr->c.stat_result(rc, &res.stat, cptr->data);
deallocate_SetDataResponse(&res);
}
break;
case COMPLETION_STRINGLIST:
LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%#x failed=%d rc=%d",
cptr->xid, failed, rc));
if (failed) {
cptr->c.strings_result(rc, 0, cptr->data);
} else {
struct GetChildrenResponse res;
deserialize_GetChildrenResponse(ia, "reply", &res);
cptr->c.strings_result(rc, &res.children, cptr->data);
deallocate_GetChildrenResponse(&res);
}
break;
case COMPLETION_STRINGLIST_STAT:
LOG_DEBUG(("Calling COMPLETION_STRINGLIST_STAT for xid=%#x failed=%d rc=%d",
cptr->xid, failed, rc));
if (failed) {
cptr->c.strings_stat_result(rc, 0, 0, cptr->data);
} else {
struct GetChildren2Response res;
deserialize_GetChildren2Response(ia, "reply", &res);
cptr->c.strings_stat_result(rc, &res.children, &res.stat, cptr->data);
deallocate_GetChildren2Response(&res);
}
break;
case COMPLETION_STRING:
LOG_DEBUG(("Calling COMPLETION_STRING for xid=%#x failed=%d, rc=%d",
cptr->xid, failed, rc));
if (failed) {
cptr->c.string_result(rc, 0, cptr->data);
} else {
struct CreateResponse res;
memset(&res, 0, sizeof(res));
deserialize_CreateResponse(ia, "reply", &res);
cptr->c.string_result(rc, res.path, cptr->data);
deallocate_CreateResponse(&res);
}
break;
case COMPLETION_ACLLIST:
LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%#x failed=%d rc=%d",
cptr->xid, failed, rc));
if (failed) {
cptr->c.acl_result(rc, 0, 0, cptr->data);
} else {
struct GetACLResponse res;
deserialize_GetACLResponse(ia, "reply", &res);
cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
deallocate_GetACLResponse(&res);
}
break;
case COMPLETION_VOID:
LOG_DEBUG(("Calling COMPLETION_VOID for xid=%#x failed=%d rc=%d",
cptr->xid, failed, rc));
assert(cptr->c.void_result);
cptr->c.void_result(rc, cptr->data);
break;
case COMPLETION_MULTI:
LOG_DEBUG(("Calling COMPLETION_MULTI for xid=%#x failed=%d rc=%d",
cptr->xid, failed, rc));
rc = deserialize_multi(xid, cptr, ia);
assert(cptr->c.void_result);
cptr->c.void_result(rc, cptr->data);
break;
default:
LOG_DEBUG(("Unsupported completion type=%d", cptr->c.type));
}
}
读写数据
使用ZooKeeper API读写数据时,其实就是构造completion_list_t,添加到struct _zhandle里的sent_requests队列里,然后等待上面的IO线程把数据发送出去。等待数据返回后,找到相应的completion_list_t,然后调用回调。
先看到这里吧,日后有需求再深入看一下。