前言:接下来将会着重解释一下ZooKeeper里面重要的代码,本次将针对Worker机制,以及运用Worker进行对集群状态进行监控。如果对Zookerper的安装配置和运用不了解的,可以看看上两篇文章:(1)Zookeeper教程(一):快速开始以及结合java实现分布式Barrier和Queue (2) Zookeeper教程(二):ZooKeeper与Dubbo结合以及原理讲解
(一)Worker原理和源代码解析
(1)原理:
客户端向服务端注册一个Watcher监听,服务端特定事件发生后,触发这个Watcher,接着向指定客户端发送一个事件通知。原理挺简单,那么是如何实现的呢?
(2)结合源码进行解析
Watcher机制主要包括了客户端,客户端WatchManager,ZooKeeper服务端三部分。客户端在服务端注册Watcher的时候,同时将Watcher对象存储在WatchManager中,当服务端触发了Watcher事件后,会向客户端发送通知,客户端此时通过取出相应的Watcher对象来执行。接下来先看一下WatchManager类中部分代码:
(3)WatchManager类
public class WatchManager {
private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<String, HashSet<Watcher>>();
private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();
public synchronized int size(){
int result = 0;
for(Set<Watcher> watches : watchTable.values()) {
result += watches.size();
}
return result;
}
//同步方法,保证线程安全
public synchronized void addWatch(String path, Watcher watcher) {
//先取出所有的watch
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);
HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
}
(4)Watcher接口(代码过长,选取重要部分)
public interface Watcher {
public interface Event {
.........
}
}
abstract public void process(WatchedEvent event);
}
在Event里面包含了KeeperState和EventType两个枚举类,详细对应如下:
而process方法是一个回调方法(对回调不清楚的可以搜下资料或者在下面评论,我再解释一下),当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会调用对应的process进行回调。而precess的参数是一个WatchedEvent类型的参数,接下来看看这个类
(5)WatchedEvent类
public class WatchedEvent {
final private KeeperState keeperState;
final private EventType eventType;
private String path;
//省略构造函数和get方法
/** * Convert WatchedEvent to type that can be sent over network */
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(),
keeperState.getIntValue(),
path);
}
}
从上面可以看出WatchedEvent包含三个属性:通知状态(KeeperState),事件类型(EventType)和节点路径(Path),ZooKeeper调用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。
我们注意到在getWrapper中返回了一个WatcherEvent对象,这个对象和WatchedEvent表示同一个事务,都是对服务端时间的封装。不同的是WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象。而通过WatcherEvent的源码我们可以看到它实现了序列化接口,因此可以用于网络传输
于是整个流程就是:服务端调用getWrapper方法包装一个WatcherEvent事件,客户端在收到这个事件对象时候,首先将WatcherEvent还原成一个WatchedEvent事件,并传递给process处理,接着回调函数process根据这个事件就能解析出完整的服务端事件了。
通知客户端和存储Watcher看完了,接下来看看客户端注册Watcher流程
(6)客户端注册Watcher流程 首先看一行代码:
zk = new ZooKeeper(HostPort,2000,connectionWatcher);
在这行代码里面,第三个参数是一个Watcher,将作为整个ZooKeeper会话期间默认的Watcher,会一直保存在客户端ZKWatcherManager的defaultWatcher里面。ZKWatchManager是ZooKeeper的一个内部静态类,部分代码如下
private static class ZKWatchManager implements ClientWatchManager {
//数据变更Watcher
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
//子节点变更Watcher
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher;
final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}
}
客户端的请求几乎在ClientCnxn里面进行操作,当收到请求后,客户端会对当前客户端请求进行标记,然后将其设置为使用Watcher监听,同时会封装一个Watcher的注册信息WatchRegistration对象,用于暂时保存数据节点的路径和Watcher的对应关系 在 ZooKeeper 中,Packet 是一个最小的通信协议单元,即数据包。Pakcet 用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。在 ClientCnxn 中 WatchRegistration 也会被封装到 Pakcet 中,然后由 SendThread 线程调用 queuePacke 方法把 Packet 放入发送队列中等待客户端发送,这又是一个异步过程,分布式系统采用异步通信是一个普遍认同的观念。随后,SendThread 线程会通过 readResponse 方法接收来自服务端的响应,异步地调用 finishPacket 方法从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
我们可以通过getData,getChildren,exist三个接口向ZooKeeper服务端注册Watcher,下面挑getChildren出来讲一下
public List<String> getChildren(final String path, Watcher watcher,
Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getChildren2);
GetChildren2Request request = new GetChildren2Request();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetChildren2Response response = new GetChildren2Response();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getChildren();
}
接下来看看register的代码
/** * Register the watcher with the set of watches on path. * @param rc the result code of the operation that attempted to * add the watch on the path. */
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
在这个方法里面,客户端将Watcher对象转交给ZKWatcherManager,并最终保存在一个Map类型的数据结构dataWatches里面,用于将数据结点的路径和Watcher对象进行一一映射后管理起来
(7)服务端处理watcher流程
序列图如下(看到的一个序列图,感觉很不错)
接下来将会精简地讲解主要的代码:
FinalRequestProcessor类接收到客户端请求后,会调用processRequest方法进行处理,会进一步转向 ZooKeeperServer 的 processRequest 进行进一步处理,处理结由 ZKDatabase 类返回。
processRequest代码
public void processRequest(Request request) {
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
rc = zks.processTxn(hdr, txn);
}
ZooKeeperServer 代码
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
rc = getZKDatabase().processTxn(hdr, txn);
if (opCode == OpCode.createSession) {
if (txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addSession(sessionId, cst
.getTimeOut());
} else {
LOG.warn("*****>>>>> Got "
+ txn.getClass() + " "
+ txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}
ZKDatabase 代码
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
switch (header.getType()) {
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;
对于注册 Watcher 请求,FinalRequestProcessor 的 ProcessRequest 方法会判断当前请求是否需要注册 Watcher,如果为 true,就会将当前的 ServerCnxn 对象和数据节点路径传入 getData 方法中去。ServerCnxn 是一个 ZooKeeper 客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接,我们后面讲到的 process 回调方法,实际上也是从这里回调的,所以可以把 ServerCnxn 看作是一个 Watcher 对象。数据节点的节点路径和 ServerCnxn 最终会被存储在 WatchManager 的 watchTable 和 watch2Paths 中。
如前所述,WatchManager 负责 Watcher 事件的触发,它是一个统称,在服务端 DataTree 会托管两个 WatchManager,分别是 dataWatches 和childWatches,分别对应数据变更 Watcher 和子节点变更 Watcher。当发生 Create、Delete、NodeChange(数据变更)这样的事件后,DataTree 会调用相应方法去触发 WatchManager 的 triggerWatch 方法,该方法返回 ZNODE 的信息,自此进入到回调本地 process 的序列。
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); //将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个 WatchedEvent 对象 HashSet<Watcher> watchers; synchronized (this) { //根据数据节点的节点路径从 watchTable 里面取出对应的 Watcher。如果没有找到 Watcher 对象, 说明没有任何客户端在该数据节点上注册过 Watcher,直接退出。如果找打了 Watcher 就将其提取出来, 同时会直接从 watchTable 和 watch2Paths 里删除 Watcher,即 Watcher 是一次性的,触发一次就失效了。 watchers = watchTable.remove(path); for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); }
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
//对于需要注册 Watcher 的请求,ZooKeeper 会把请求对应的恶 ServerCnxn 作为一个 Watcher 存储,
所以这里调用的 process 方法实质上是 ServerCnxn 的对应方法
w.process(e);
}
return watchers;
}
如果想要处理一个 Watcher,需要执行的步骤如下所示:
1. 将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个 WatchedEvent 对象。
2. 根据数据节点的节点路径从 watchTable 里面取出对应的 Watcher。如果没有找到 Watcher 对象,说明没有任何客户端在该数据节点上注册过 Watcher,直接退出。如果找到了 Watcher 就将其提取出来,同时会直接从 watchTable 和 watch2Paths 里删除 Watcher,即 Watcher 是一次性的,触发一次就失效了。
3. 对于需要注册 Watcher 的请求,ZooKeeper 会把请求对应的 ServerCnxn 作为一个 Watcher 存储,所以这里调用的 process 方法实质上是 ServerCnxn 的对应方法,在请求头标记“-1”表示当前是一个通知,将 WatchedEvent 包装成 WatcherEvent 用于网络传输序列化,向客户端发送通知,真正的回调方法在客户端
(8)集群监控示例:
package com.shaoqing.zookeeper3;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterMonitor implements Runnable {
private static String membershipRoot = "/Members";
private final Watcher connectionWatcher;
private final Watcher childrenWatcher;
private ZooKeeper zk;
boolean alive = true;
public ClusterMonitor(String HostPort) throws IOException, InterruptedException, KeeperException {
connectionWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
if (event.getType() == Watcher.Event.EventType.None
&& event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("\nconnectionWatcher Event Received:%s" + event.toString());
}
}
};
childrenWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
System.out.println("\nchildrenWatcher Event Received:%s" + event.toString());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
// Get current list of child znode and reset the watch
List<String> children = zk.getChildren(membershipRoot, this);
System.out.println("Cluster Membership change,Members: " + children);
} catch (KeeperException ex) {
throw new RuntimeException(ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
alive = false;
throw new RuntimeException(ex);
}
}
}
};
zk = new ZooKeeper(HostPort, 2000, connectionWatcher);
// Ensure the parent znode exists
if (zk.exists(membershipRoot, false) == null) {
zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// Set a watch on the parent znode
List<String> children = zk.getChildren(membershipRoot, childrenWatcher);
System.err.println("Members:" + children);
}
public synchronized void close() {
try {
zk.close();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
synchronized (this) {
while (alive) {
wait();
}
}
} catch (InterruptedException ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
if (args.length != 1) {
System.err.println("Usage:ClusterMonitor<Host:Port>");
System.exit(0);
}
String hostPort = args[0];
new ClusterMonitor(hostPort).run();
}
}
package com.shaoqing.zookeeper3;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterClient implements Watcher, Runnable {
private static String membershipRoot = "/Members";
ZooKeeper zk;
public ClusterClient(String hostPort, Long pid) {
String processId = pid.toString();
try {
zk = new ZooKeeper(hostPort, 2000, this);
} catch (IOException ex) {
ex.printStackTrace();
}
if (zk != null) {
try {
zk.create(membershipRoot + '/' + processId, processId.getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
} catch (KeeperException | InterruptedException ex) {
ex.printStackTrace();
}
}
}
public synchronized void close() {
try {
zk.close();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
System.out.println("\nEvent Received:%s" + event.toString());
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("Usage:ClusterClient<Host:Port>");
System.exit(0);
}
String hostPort = args[0];
// Get the process id
String name = ManagementFactory.getRuntimeMXBean().getName();
int index = name.indexOf('@');
Long processId = Long.parseLong(name.substring(0, index));
new ClusterClient(hostPort, processId).run();
}
}
上面的示例我们演示了如何发起对于一个 ZNODE 的监听,当该 ZNODE 被改变后,我们会触发对应的方法进行处理,这类方式可以被用在数据监听、集群状态监听等用途。
Git代码地址
github.com/wacxt/zooke…
参考文献: 1,《从 Paxos 到 ZooKeeper》
本文对你有帮助?欢迎扫码加入后端学习小组微信群: