zookeeper可以对某个节点进行监听,监听的事件有以下4种:
- NodeCreated (节点创建)
- NodeDeleted (节点删除)
- NodeDataChanged (节点数据变化)
- NodeChildrenChanged (子节点变化,包括子节点删除、创建和数据变化)
但每个事件只能触发一次,之后监听就不再生效
本文介绍怎么实现zookeeper对节点的循环监听,即永久生效
1. 流程
客户端发起对节点的事务操作(以NodeChildrenChanged事件为例)
服务端监听到对应的事件后进行相应的操作
2. 代码实现
(1) Client.java
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class Client {
private static final String CONNECT_STRING = "hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181";
private static final int SESSION_TIMEOUT = 5000;
private static final String PARENT = "/name";
private static final String CHILD = "tony";
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, null);
// 客户端创建了一个子节点,会触发NodeChildrenChanged事件
String path = zk.create(PARENT + "/" + CHILD, CHILD.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(path);
zk.close();
}
}
(2) Server.java
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
/**
* @Description: 循环监听某节点
* @author Jed
* @date 2017年12月19日
*/
public class Server {
private static ZooKeeper zk;
private static final String CONNECT_STRING = "hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181";
private static final int SESSION_TIMEOUT = 5000;
private static final String PARENT = "/name";
public static void main(String[] args) throws Exception {
zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
String path = event.getPath();
EventType type = event.getType();
KeeperState state = event.getState();
System.out.println(path + "\t" + type + "\t" + state);
// 循环监听
try {
zk.getChildren(PARENT, true);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 添加监听
zk.getChildren(PARENT, true);
// 模拟服务器一直运行
Thread.sleep(Long.MAX_VALUE);
}
}
3. 测试
- 首先运行server.java
控制台输出:
null None SyncConnected
这是获取连接时的事件,每次获得连接都会触发
- 然后运行一次Client.java
Client控制台输出:
/name/tony0000000000
因为Client创建的是PERSISTENT_SEQUENTIAL
类型的节点,所以会自动递增的编号
Server控制台的内容实时更新:
null None SyncConnected
/name NodeChildrenChanged SyncConnected
- 再运行一次Client.java
Client控制台输出:
/name/tony0000000001
Server控制台的内容也会实时更新:
null None SyncConnected
/name NodeChildrenChanged SyncConnected
/name NodeChildrenChanged SyncConnected
至此,我们已经实现了对某个节点的循环监听!
说明:
- 监听到对应的事件触发后,我们只是做了打印,具体的行为应该根据业务逻辑来设计
- 监听某个节点的子节点变化(NodeChildrenChangeds)事件,首先该节点应该存在,本例中监听/name节点的子节点变化,那么/name节点应该提前创建好