首先,ZooKeeper的session的关闭是由Leader来执行的,因为session的关闭也算是事务之一。至于Leader如何知道所有follower上面管理的session,这个请参考我的另一篇文章:ZooKeeper是如何管理Session的。
Leader上,ZooKeeper的session管理具体是由SessionTrackerImpl来负责的。在检查到session超时后,ZooKeeperServer提交了一个关闭session的事务请求。
/**
* @param cnxn
* @param sessionId
* @param xid
* @param bb
*/
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}
提交请求后,这个请求就进入ZooKeeper的请求处理链中了。第一个处理这个请示的是处理器PreRequestProcessor。
经过简单的消息流转,请示到达pRequest2Txn这个函数。
case OpCode.closeSession:
// We don't want to do this check since the session expiration thread
// queues up this operation without being the session owner.
// this request is the last of the session so it should be ok
//zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
HashSet<String> es = zks.getZKDatabase()
.getEphemerals(request.sessionId);
synchronized (zks.outstandingChanges) {
for (ChangeRecord c : zks.outstandingChanges) {
if (c.stat == null) {
// Doing a delete
es.remove(c.path);
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
for (String path2Delete : es) {
addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
path2Delete, null, 0, null));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
}
LOG.info("Processed session termination for sessionid: 0x"
+ Long.toHexString(request.sessionId));
break;
这段代码,主要就是将此session相关的临时节点,挨个添加一个stat为null的ChangeRecord。
然后通知sessionTracker,将session的状态设置为关闭中。
接下来这个closeSession的请求,就跟正常的写请求差不多,在各个处理器之间流转。最后到达FinalRequestProcessor,
调用ZooKeeperServer.processTxn->ZkDatabase.processTxn->DataTree.processTxn->DataTree.killSession->DataTree.deleteNode。
最终将Session相关的临时节点删除。
注:文中的说明及代码基于ZooKeeper 3.4.11。