Zookeeper入门之三-Java客户端curator的使用

ZK的java客户端—curator 基本使用

普通的增删改查实现–同步接口
 public class CuratorConTest {

    static RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); // 重试3次

    // 创建连接 -- 传统写法
/*        CuratorFramework zkClient = CuratorFrameworkFactory.newClient("localhost:32770",
                5000,
                3000,
                policy);*/

    // 创建连接-- 流式写法
    static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .retryPolicy(policy)
            .namespace("zk-jsy")  // 如果指定了某个应用只能在某一个节点下操作,
            // 可以指定namespace,这里base表示路径为/base。记得不能直接用/,会报错。
            .build();


    public static void main(String[] args) throws Exception {

        // 连接开启
//        zkClient.start();

        zkFluentClient.start();

        // 测试创建
        CuratorConTest test = new CuratorConTest();
        test.testCreate();

        // 测试获取
        test.testGet();

        // 测试更新
        test.testUpdate();

        // 测试删除
        test.testDelete();

//        Thread.sleep(Integer.MAX_VALUE);
    }

    private void testCreate() throws Exception {
        // 1、创建默认类型节点,书上说默认内容为空,但是实际上上我本地的ip地址
        //org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /zk-jsy/book
        // 是因为zookeeper的版本和curator的版本不兼容导致的,默认zk的版本是3.5.1-Alpha,降级成3.4.8即可
        zkFluentClient.create().forPath("/book1-" + ThreadLocalRandom.current().nextInt());

        // 2、创建有默认值的节点
        zkFluentClient.create().forPath("/book2-" + ThreadLocalRandom.current().nextFloat(), "mytestbook2Create".getBytes());

        // 3、创建临时节点,断开后会自动清除
        zkFluentClient.create().withMode(CreateMode.EPHEMERAL).
            forPath("/book3-" + ThreadLocalRandom.current().nextInt());

        // 4、创建临时节点,同时如果父节点不存在,也把父节点创建了。但是父节点会是持久节点
        zkFluentClient.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL).
            forPath("/test/book4-test" + ThreadLocalRandom.current().nextInt());
    }

    private void testGet() throws Exception {
        String path = "/getData/mydata-" + ThreadLocalRandom.current().nextInt();
        zkFluentClient.create().creatingParentsIfNeeded().forPath(path,
                ("sogetdata" + ThreadLocalRandom.current().nextInt()).getBytes());

        // 1、获取,注意返回的是bytes
        String value = new String(zkFluentClient.getData().forPath(path));
        System.out.println(value);

        // 2、获取属性
        Stat stat = new Stat();
        String value11 = new String(zkFluentClient.getData().storingStatIn(stat).forPath(path));
        System.out.println(stat.toString());
        System.out.println(value11);
    }

    private void testUpdate() throws Exception {
        String path = "/updateData/mydata-" + ThreadLocalRandom.current().nextInt();
        zkFluentClient.create().creatingParentsIfNeeded().forPath(path,
                ("toBeUpdate" + ThreadLocalRandom.current().nextInt()).getBytes());
        System.out.println("originData:" + new String(zkFluentClient.getData().forPath(path)));

        // 1、 普通的update,不管version
        Stat stat = zkFluentClient.setData().forPath(path, ("newData" + ThreadLocalRandom.current().nextInt()).getBytes());
        System.out.println("newData:" + new String(zkFluentClient.getData().forPath(path)));

        // 2、乐观锁更新,可以用来实现CAS,如果version不匹配,是无法更新的
        zkFluentClient.setData().withVersion(stat.getVersion()).forPath(path, ("UpdateByVersion:" + stat.getVersion()).getBytes());
        System.out.println("updateByVersionData:" + new String(zkFluentClient.getData().forPath(path)));

        // 2.1 测试cas,传入version=1,而当前实际为2
        zkFluentClient.setData().withVersion(1).forPath(path,"error".getBytes());
        System.out.println("updateByErrorVersionData:" + new String(zkFluentClient.getData().forPath(path)));
        // KeeperErrorCode = BadVersion for /zk-jsy/updateData/mydata--1431282676 返回这个异常,version不对
    }

    private void testDelete() throws Exception {

        // 1、普通删除,但是不能删除含有叶子节点的父节点
        String path = "/book/forDelete" + ThreadLocalRandom.current().nextInt();
        zkFluentClient.create().creatingParentsIfNeeded().forPath(path);
        //Thread.sleep(20000);  // sleep期间可以看到对应的节点
        zkFluentClient.delete().forPath(path);  // 删除节点

        // 2、删除节点,以及递归删除其子节点,如果传入/,删除的是 namespace下的根目录
        zkFluentClient.delete().deletingChildrenIfNeeded().forPath("/book");

        // 3、无论如何,只要客户端连接存在,就会一直重试,直到删除成功,避免因为集群选主等情况造成数据无法清除
        zkFluentClient.delete().guaranteed().deletingChildrenIfNeeded().forPath("/");
    }
}
创建的异步实现
/**
 * 异步实现
 */
public class AsyncCuratorTest {

    static RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); // 重试3次

    // 创建连接-- 流式写法
    static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .retryPolicy(policy)
            .namespace("zk-asyncjsy")  // 如果指定了某个应用只能在某一个节点下操作,
            // 可以指定namespace,这里base表示路径为/base。记得不能直接用/,会报错。
            .build();

    static CountDownLatch countDownLatch = new CountDownLatch(2); // countDownLatch

    static ExecutorService tp = Executors.newFixedThreadPool(2); // ThreadPool

    public static void main(String[] args) throws Exception {

        String path = "/asyncCreate" + ThreadLocalRandom.current().nextInt();

        zkFluentClient.start();

        // 异步创建 -1
        zkFluentClient.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        System.out.println("Event[code:" + curatorEvent.getResultCode() + ", type:" + curatorEvent.getType());

                        System.out.println("Thread of processResult:" + Thread.currentThread().getName());

                        countDownLatch.countDown();
                    }
                }, tp).forPath(path, "createInfo".getBytes());

        // 异步创建 -2 ,会重复,返回错误码
        zkFluentClient.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        System.out.println("Event[code:" + curatorEvent.getResultCode() + ", type:" + curatorEvent.getType());

                        System.out.println("Thread of processResult:" + Thread.currentThread().getName());

                        countDownLatch.countDown();
                    }
                }).forPath(path, "createAgain".getBytes());

        countDownLatch.await();
        tp.shutdown();
    }
}

执行结果如下:

Event[code:0, type:CREATE
Thread of processResult:pool-3-thread-1
Event[code:-110, type:CREATE
Thread of processResult:main-EventThread

注意以下方面:

  1. 第一次异步回调传入了 线程池tp,用于在线程池中执行对应的回调,可以从结果中看到,执行的线程并不是Main线程
  2. 第二次异步回调,并没有传入线程池tp,所以执行操作的是主线程Main
  3. countDownLatch在这里只是为了保证线程执行结束后,可以shutdown线程池
  4. 返回的event.getResultCode如果是0,表示操作成功,如果是其他值,表示不成功,比如-110,表示数据节点已存在。
    原文作者:AlanKim
    原文地址: https://www.jianshu.com/p/ccd31ddcd2d6
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞