Zookeeper 是一个分布式协调服务的开源框架。主要用来解决分布式集群中
应用系统的一致性问题。本文利用Zookeeper的JavaApi来实现一个数据订阅发布功能
主要内容:
- 1.常用API
- 2.订阅/发布
相关文章:
1.Zookeeper之集群安装
2.Zookeeper之数据订阅发布
1.常用API
引入依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
常用操作
// 初始化 ZooKeeper
ZooKeeper zk = new ZooKeeper("hadoop1:2181,hadoop2:2181,hadoop3:2181", 30000, event -> System.out.println("已经触发了" + event.getType() + "事件!"));
// 创建一个目录节点
zk.create("/parent", "parent".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 创建一个子目录节点
zk.create("/parent/child1", "child1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 取出节点数据
System.out.println(new String(zk.getData("/parent", false, null)));
// 取出子目录节点列表
System.out.println(zk.getChildren("/parent", true));
// 修改子目录节点数据
zk.setData("/parent/child1", "child1".getBytes(), -1);
System.out.println("目录节点状态:[" + zk.exists("/parent", true) + "]");
// 创建另外一个子目录节点
zk.create("/parent/child2", "child2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/parent/child2", true, null)));
// 删除子目录节点
zk.delete("/parent/child1", -1);
zk.delete("/parent/child2", -1);
// 删除父目录节点
zk.delete("/parent", -1);
// 关闭连接
zk.close();
2.订阅/发布
Zookeeper的数据结构为
-----/
-----|----servers
-------------|---server1
-------------|---server2
2.1.订阅
public class SubClient {
static ZooKeeper zk;
static String server = "/servers";
public static void main(String[] args) throws Exception {
String hosts = "hadoop1:2181,hadoop2:2181,hadoop3:2181";
zk = new ZooKeeper(hosts, 30000, event -> {
// 监听"/servers"节点下的子节点变化事件, 更新server列表, 并重新注册监听
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged
&& (server).equals(event.getPath())) {
try {
updateServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
updateServerList();
Thread.sleep(Integer.MAX_VALUE);
}
public static void updateServerList() throws Exception {
// 如果"/servers"不存在,则创建
if (zk.exists(server, false) == null) {
zk.create(server, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
List<String> newServerList = new ArrayList<>();
// 获取并监听"/servers"的子节点变化
// watch参数为true, 表示监听子节点变化事件.
// 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册
List<String> subList = zk.getChildren(server, true);
for (String subNode : subList) {
// 获取每个子节点下关联的server地址
byte[] data = zk.getData(server + "/" + subNode, false, null);
newServerList.add(new String(data, "utf-8"));
}
System.out.println(newServerList);
}
}
监听”/servers”的子节点的变化,每个子节点里存放着一个服务
2.2.发布
public class PubClient {
static String server = "/servers";
public static void main(String[] args) throws Exception {
String hosts = "hadoop1:2181,hadoop2:2181,hadoop3:2181";
ZooKeeper zk = new ZooKeeper(hosts, 30000, null);
// 如果"/servers"不存在,则创建
if (zk.exists(server, false) == null) {
zk.create(server, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
// 注册服务
zk.create(server + "/" + "server1", "192.168.1.1:8080".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Thread.sleep(10000);
zk.close();
}
}
向”/servers”目录下创建临时子节点,其中每个子节点保存一个服务,可以修改子节点名称,发布多个服务,暂停10s后,断开连接,临时子节点会自动删除。
输出结果如下:
[]
[192.168.1.2:8080]
[192.168.1.2:8080, 192.168.1.1:8080]
[192.168.1.1:8080]
[]