Zookeeper之JAVA API学习笔记

Zookeeper提供了多种编程语言API来编写Zookeeper(后面简称zk)客户端程序,典型的如JAVA和C语言。在本文中,我们介绍如何使用Java Api。本文包含如下章节的内容:

  • 开发环境配置客户端
  • 基本流程
  • zk操作
  • 异步API
  • 小结

参考资料:
1、如果希望先对Zookeeper组件的基本概念有所了解,可先阅读《Zookeeper学习笔记》

一、开发环境配置

利用java api编写zk客户端程序,需要依赖zk的api Jar包。在本案例中要想代码成功编译需要引入zk解压目录下的 zookeeper-3.4.10.jar 。

因为运行时依赖日志框架,还需要引入zk解压目录下lib目录下的log4j-1.2.16.jar和slf4j-api-1.6.1.jar。这样我们就可以在IDE环境中编译和运行程序了。

运行编写的zk客户端程序之前,需要先启动zk服务,在开发阶段,可以以独立模式启动zk服务。具体可参见《Zookeeper学习笔记》

二、基本代码流程

zk的提供的api中,核心的是ZooKeeper类,该类的对象用于建立和维护与zk服务器的连接,并提供各种操作zk节点的方法。

编写zk客户端程序,首先要创建ZooKeeper类的实例,ZooKeeper类有多个重构的构造函数,我们先介绍一个基本的构造函数,定义如下:

public ZooKeeper(String connectString, int sessionTimeout, 
  Watcher watcher)throws IOException;

该构造函数有3个参数,第1个参数是指定zk服务器的地址和端口信息;第2个参数是指定超时时间(单位毫秒);第3个参数是是一个Watcher对象实例。

Watcher类也是zk提供的Java api中的一个重要类,它是一个接口,用于实现zk的观察机制。在创建ZooKeeper类的实例时,和在调用ZooKeeper类的方法时,可以为znode设置观察触发器,当znode发生变化时,zk的观察机制就会通过触发触发器让客户端得到通知。

Watcher接口的定义如下:

public interface Watcher {
    ....
   abstract public void process(WatchedEvent event);
}

Watcher接口中定义了一个process方法,该方法有一个参数event,通过该参数可获取zk框架回调process方法时传入的通知信息。
event对应的WatchedEvent类中提供了几个方法可以获取具体的信息,如:
event.getState()//获取事件状态
event.getType() //获取事件类型
event.getPath() //获取事件出发的znode路径

对于调用ZooKeeper类构造函数传入的Watcher实例,有两个作用。一是当zk客户端与服务器建立链接和断开链接时,该触发器会被触发。另外是作为缺省的触发器,当znode节点被设置了观察但没有设置具体的触发器时,如果该znode节点发生变化,则该缺省触发器会被调用。这个在后面章节会有具体介绍。

需要注意的是,通过构造函数创建ZooKeeper对象实例时,会启动一个单独的线程连接zk服务。因为调用ZooKeeper类的构造函数是立即返回的,因此,如果返回后立即使用ZooKeeper对象的方法,这时很可能zk对象还未与服务器建立连接(因为是在单独线程中去连接,肯定有时延),这样就会报错。也就是说,必须等该线程成功连接zk服务后,才能使用被创建的ZooKeeper对象。

那问题是我们怎么知道何时成功连接zk服务器呢?这时创建ZooKeeper类实例时调用的构造函数传入的Watcher对象就发挥作用了,一旦连接成功,zk就会回调Watcher对象中的process方法,这样我们在process方法中就可以判断是否连接成功。

因为调用ZooKeeper构造函数和process方法被执行是在两个线程中,需要利用信号量来同步两个操作。这时我们可以利用java.util.concurrent包中的CountDownLatch类。

CountDownLatch类可用于实现同步机制,CountDownLatch对象实例中有一个计数器,当该计数器的值大于0时,调用它的await方法就会堵塞,如果该计数器的值变为0时,await方法就会返回。这样我们可以创建一个全局的CountDownLatch实例,通过构造函数设置对象计数器的初始值设为1,如下面代码:

CountDownLatch connectedSemaphore= **new** CountDownLatch(1);

然后在创建ZooKeeper对象后调用CountDownLatch的await()方法,这样该语句会被堵塞。并在Watcher的process方法中检测是否有连接成功的通知,如果有,调用CountDownLatch类的countDown()方法让计数器减1变为0,这样前面的await()语句就由堵塞变成不堵塞返回。这时就可以使用被创建的ZooKeeper对象了。

最后我们不在使用ZooKeeper对象时,要调用ZooKeeper的close方法关闭与zk服务器的连接。

所以使用zk提供的api编写程序的一个基本模式是:

主程序中的伪代码:

{
设置CountDownLatch对象的计数器为1;
创建ZooKeeper对象;
调用CountDownLatch对象的await方法等到连接服务器成功;
执行zk的操作(如创建znode节点);
关闭连接;
}

Watcher接口的process方法中代码:

{
判断是否连接服务器成功;
如果是,设置CountDownLatch对象的计数器为0
}

下面我们来看具体的一些zk操作例子。

三、创建znode节点

创建znode节点可使用ZooKeeper类的create方法,完整的代码例子如下:

package com.zkexample;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class CreateZnode implements Watcher {
    private ZooKeeper zk;
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
    
    public static void main(String[] args) throws Exception {
        CreateZnode createZnode = new CreateZnode();
        createZnode.connect();
        createZnode.action();
        createZnode.close();
        System.out.println("create znode success");
    }
    
    public void  connect() throws IOException, InterruptedException{
        zk = new ZooKeeper("localhost:2181",10000,this);
        connectedSemaphore.await();
    }
    
    public void action() throws KeeperException, InterruptedException{
        zk.create("/zoo", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    
    public void close() throws InterruptedException{
        zk.close();
    }

    @Override
    public void process(WatchedEvent event) {
        if(event.getState()==KeeperState.SyncConnected){
            connectedSemaphore.countDown();
        }
    }
}

下面我们来对上面的代码进行解释(部分代码的含义在上一章中已经描述,这里不再重复解释)。

1、上面代码定义了一个普通的java类(类名为CreateZnode),实现了Watcher接口的process方法,Watcher接口的含义见上一章节中介绍。

2、CreateZnode 类中有三个方法:
1)connect方法用于创建ZooKeeper对象,并等待连接成功。
2)action方法中调用了ZooKeeper类中的create方法来创建znode节点。
3)close方法中调用了ZooKeeper类中的close方法来关闭zk客户端与zk服务器的连接。

3、ZooKeeper类中的create方法
create方法用于创建znode节点,有多个重载方法,上面例子中用到的方法定义如下:

public String create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode)
        throws KeeperException, InterruptedException;

该方法有4个参数:
1)第1个参数是待创建的znode的路径
2)第2个参数是znode存储的数据,本例中传入null
3)第3个参数是ACL列表,本例传入的常量表示该节点的权限不受控制。
4)第4个参数是创建的节点类型,本例传入的常量表示创建的是持久节点。

上面代码的工程只要引入适当的Jar包(见前面章节介绍),就可以直接在IDE中编译和运行。

四、删除znode节点

ZooKeeper类提供的delete方法可以删除指定的znode节点。delete方法定义如下:

void delete(String path, int version) throws InterruptedException, 
   KeeperException;

该方法有两个参数,第1个参数是待删除的znode的路径,第2个参数是版本号。
1)如果version参数指定的值与待删除的znode的dataVersion不一致,则无法删除,会抛异常。可以传入值-1,则不管版本号是多少都会删除。
2)如果待删除的znode不存在,则delete方法会抛异常。
3)如果待删除znode下有子节点,无法删除,会抛异常

例子代码如下(与上面例子相比,改变的只是action方法,因此下面只是给出了部分代码):

public void action() throws KeeperException, InterruptedException{
        zk.delete("/zoo/hello", 0);
    }

上面代码的含义是删除数据版本号为0的节点路径为”/zoo/hello”的znode。
如果该znode上设置了观察,则执行delete操作后,会触发观察。

五、设置znode存储的数据

ZooKeeper类提供的setData方法可以给znode设置要存储的数据。setData方法定义如下:

Stat setData(String path, byte[] data, int version) throws 
  KeeperException, InterruptedException;

该方法有3个参数,第1个参数是待设置的znode的路径,第2个参数是待设置的内容,第3个参数是版本号。
如果version参数指定的值与待设置的znode的dataVersion不一致,则无法设置,会抛异常。可以传入值-1,则不管版本号是多少都会被设置。
如果待设置znode不存在,会抛异常。
该方法有返回值,返回的是待设置节点的元数据信息。

例子代码如下(与上面例子相比,改变的只是action方法,因此下面只是给出了部分代码):

public void action() throws KeeperException, InterruptedException{
        byte[] data = "adad".getBytes();
        Stat stat = zk.setData("/zoo", data , -1);
    }

如果该znode上设置了观察,则执行setData操作后,会触发观察。

六、测试znode是否存在

ZooKeeper类提供的exists方法可以检查指定的znode是否存在,如果不存在,返回null;如果存在返回封装有znode元数据的Stat对象。exists方法有两个重载的方法,分别定义如下:

public Stat exists(final String path, Watcher watcher)
        throws KeeperException, InterruptedException;

上面方法的第1个参数是待检查znode的路径;第2个参数是设置观察触发器,如果该节点被创建、删除或其数据被修改,设置的观察将被触发。

public Stat exists(String path, boolean watch) throws 
        KeeperException,InterruptedException;

上面方法的第1个参数是待检查znode的路径;第2个参数是个布尔值,如果为true,则为该节点设置观察,触发器为默认的触发器(即调用构造函数创建ZooKeeper对象时传入的触发器)。

七、查看znode子节点

ZooKeeper类提供的getChildren方法可以获取指定znode下的子节点列表的名称(注意不是绝对路径)以及返回znode的状态。getChildren方法有多个重构的方法,分别定义如下(省略了异常抛出的签名):

List<String> getChildren(final String path, Watcher watcher);

List<String> getChildren(String path, boolean watch);

List<String> getChildren(final String path, Watcher watcher,Stat stat);

List<String> getChildren(String path, boolean watch, Stat stat);

1)path参数是待查看的znode的路径。
2)wathcer参数是为该znode设置观察触发器,如果该znode被删除,或znode有子节点被创建和删除,则该wathcer会被触发。
3)watch参数是一个布尔值,如果为true,则为该节点设置观察,触发器为默认的触发器(即调用构造函数创建ZooKeeper对象时传入的触发器)。
4)stat参数是用于返回该znode的元数据信息。

注意,如果传入的znode路径不存在,则getChildren方法会抛出异常。

例子代码如下(与上面例子相比,改变的只是action方法,因此下面只是给出了部分代码):

public void action() throws KeeperException, InterruptedException{
        List<String> childern = zk.getChildren("/", false);
        for(String path:childern){
            System.out.println(path);
        }
    }

八、获取znode存储的数据

ZooKeeper类提供的getData方法可以获取指定znode存储的数据以及返回znode的状态信息。getDatashuju方法有多个重构的方法,分别定义如下(省略了异常抛出的签名):

byte[] getData(final String path, Watcher watcher, Stat stat);
byte[] getData(String path, boolean watch, Stat stat);

1)path参数是待获取的znode的路径。
2)wathcer参数是为该znode设置观察触发器,如果该znode被删除,或其数据被更新,则该wathcer会被触发。
3)watch参数是一个布尔值,如果为true,则为该节点设置观察,触发器为默认的触发器(即调用构造函数创建ZooKeeper对象时传入的触发器)。
4)stat参数是用于返回该znode的元数据信息。

注意,如果传入的znode路径不存在,则getData方法会抛出异常。

九、异步API

上面介绍的API都是同步的,即操作的返回结果通过API调用的返回值或参数直接返回。不过zk提供的API还提供了异步操作的API。因为所有异步操作的结果是通过回调函数来传送的,因此异步API方法的返回类型都是void,且没有异常抛出,错误信息通过回调函数的参数获取。

下面我们通过exists方法来举例说明,异步exists方法的定义如下:

void exists(String path, boolean watch, StatCallback cb, Object ctx);

该方法有4个参数,含义如下:
1)path参数,待操作的znode的路径
2)watch参数,表示在节点上是否设置观察。具体含义同前面的介绍
3)cb参数,回调函数,下面再详细介绍
4)ctx参数,调用exists方法额外传入的参数,可用于被在回调函数中区分不同请求。

异步方法传入的回调是StatCallback 接口的实现,StatCallback 接口的定义如下:

interface StatCallback extends AsyncCallback {
  public void processResult(int rc, String path,    Object ctx, Stat stat);
}

processResult就是被回调的方法,该方法有4个参数,含义如下:
1)rc参数表示zx操作出错的异常错误码,如果为0,表示操作成功,每个非0值,代表一种异常。
2)path参数,对应调用exists方法传入的参数,可用于识别这个回调所响应的请求。
3)ctx参数,也是对应调用exists方法传入的参数,可用于在path不足识别请求时额外传入的值,如果不需要,调用exists方法时可传入null。
4)stat参数,查询获取的结果,类似同步exists方法的返回值。

需要注意的是,调用异步方法时(如调用异步的exists方法),异步方法会立即返回。StatCallback 接口中的processResult方法被回调是在另外的线程中处理的。

下面看下例子代码:

public void action(){
        zk.exists("/zoo", false, new StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                System.out.println(stat.getCtime());
            }
        }, null);
    }

十、小结

上面我们介绍了zk提供的JAVA api,用于编写zk客户端程序。

我们可以看出,zk客户端程序调用exists,getData,getChildren方法时可以为节点设置观察触发器。当该节点被zk客户端程序执行create,delete,setData方法时,节点上设置的触发器会被触发。

除了上述常见的API外,还有获取和设置znode的ACL(访问控制列表)信息,这个不在文本介绍。

    原文作者:我是老薛
    原文地址: https://www.jianshu.com/p/519acfc985e2
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞