Zookeeper之数据订阅发布

Zookeeper 是一个分布式协调服务的开源框架。主要用来解决分布式集群中
应用系统的一致性问题。本文利用Zookeeper的JavaApi来实现一个数据订阅发布功能

主要内容:

  • 1.常用API
  • 2.订阅/发布

相关文章:
1.Zookeeper之集群安装
2.Zookeeper之数据订阅发布

1.常用API

引入依赖

<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.10</version>
</dependency>

常用操作

// 初始化 ZooKeeper
ZooKeeper zk = new ZooKeeper("hadoop1:2181,hadoop2:2181,hadoop3:2181", 30000, event -> System.out.println("已经触发了" + event.getType() + "事件!"));

// 创建一个目录节点
zk.create("/parent", "parent".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

// 创建一个子目录节点
zk.create("/parent/child1", "child1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

// 取出节点数据
System.out.println(new String(zk.getData("/parent", false, null)));

// 取出子目录节点列表
System.out.println(zk.getChildren("/parent", true));

// 修改子目录节点数据
zk.setData("/parent/child1", "child1".getBytes(), -1);
System.out.println("目录节点状态:[" + zk.exists("/parent", true) + "]");

// 创建另外一个子目录节点
zk.create("/parent/child2", "child2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/parent/child2", true, null)));

// 删除子目录节点
zk.delete("/parent/child1", -1);
zk.delete("/parent/child2", -1);

// 删除父目录节点
zk.delete("/parent", -1);

// 关闭连接
zk.close();

2.订阅/发布

Zookeeper的数据结构为

-----/
-----|----servers
-------------|---server1
-------------|---server2

2.1.订阅

public class SubClient {
    static ZooKeeper zk;
    static String server = "/servers";

    public static void main(String[] args) throws Exception {
        String hosts = "hadoop1:2181,hadoop2:2181,hadoop3:2181";
        zk = new ZooKeeper(hosts, 30000, event -> {
            // 监听"/servers"节点下的子节点变化事件, 更新server列表, 并重新注册监听
            if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged
                    && (server).equals(event.getPath())) {
                try {
                    updateServerList();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        updateServerList();
        Thread.sleep(Integer.MAX_VALUE);
    }

    public static void updateServerList() throws Exception {
        // 如果"/servers"不存在,则创建
        if (zk.exists(server, false) == null) {
            zk.create(server, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }

        List<String> newServerList = new ArrayList<>();
        // 获取并监听"/servers"的子节点变化
        // watch参数为true, 表示监听子节点变化事件.
        // 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册
        List<String> subList = zk.getChildren(server, true);
        for (String subNode : subList) {
            // 获取每个子节点下关联的server地址
            byte[] data = zk.getData(server + "/" + subNode, false, null);
            newServerList.add(new String(data, "utf-8"));
        }
        System.out.println(newServerList);
    }
}

监听”/servers”的子节点的变化,每个子节点里存放着一个服务

2.2.发布

public class PubClient {

    static String server = "/servers";

    public static void main(String[] args) throws Exception {
        String hosts = "hadoop1:2181,hadoop2:2181,hadoop3:2181";
        ZooKeeper zk = new ZooKeeper(hosts, 30000, null);
        // 如果"/servers"不存在,则创建
        if (zk.exists(server, false) == null) {
            zk.create(server, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }
        // 注册服务
        zk.create(server + "/" + "server1", "192.168.1.1:8080".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Thread.sleep(10000);
        zk.close();
    }
}

向”/servers”目录下创建临时子节点,其中每个子节点保存一个服务,可以修改子节点名称,发布多个服务,暂停10s后,断开连接,临时子节点会自动删除。

输出结果如下:

[]
[192.168.1.2:8080]
[192.168.1.2:8080, 192.168.1.1:8080]
[192.168.1.1:8080]
[]

参考:
1.【Zookeeper系列五】ZooKeeper 实时更新server列表

    原文作者:阿坤的博客
    原文地址: https://www.jianshu.com/p/f24dde11d783
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞