Zookeeper提供了多种编程语言API来编写Zookeeper(后面简称zk)客户端程序,典型的如JAVA和C语言。在本文中,我们介绍如何使用Java Api。本文包含如下章节的内容:
- 开发环境配置客户端
- 基本流程
- zk操作
- 异步API
- 小结
参考资料:
1、如果希望先对Zookeeper组件的基本概念有所了解,可先阅读《Zookeeper学习笔记》。
一、开发环境配置
利用java api编写zk客户端程序,需要依赖zk的api Jar包。在本案例中要想代码成功编译需要引入zk解压目录下的 zookeeper-3.4.10.jar 。
因为运行时依赖日志框架,还需要引入zk解压目录下lib目录下的log4j-1.2.16.jar和slf4j-api-1.6.1.jar。这样我们就可以在IDE环境中编译和运行程序了。
运行编写的zk客户端程序之前,需要先启动zk服务,在开发阶段,可以以独立模式启动zk服务。具体可参见《Zookeeper学习笔记》。
二、基本代码流程
zk的提供的api中,核心的是ZooKeeper类,该类的对象用于建立和维护与zk服务器的连接,并提供各种操作zk节点的方法。
编写zk客户端程序,首先要创建ZooKeeper类的实例,ZooKeeper类有多个重构的构造函数,我们先介绍一个基本的构造函数,定义如下:
public ZooKeeper(String connectString, int sessionTimeout,
Watcher watcher)throws IOException;
该构造函数有3个参数,第1个参数是指定zk服务器的地址和端口信息;第2个参数是指定超时时间(单位毫秒);第3个参数是是一个Watcher对象实例。
Watcher类也是zk提供的Java api中的一个重要类,它是一个接口,用于实现zk的观察机制。在创建ZooKeeper类的实例时,和在调用ZooKeeper类的方法时,可以为znode设置观察触发器,当znode发生变化时,zk的观察机制就会通过触发触发器让客户端得到通知。
Watcher接口的定义如下:
public interface Watcher {
....
abstract public void process(WatchedEvent event);
}
Watcher接口中定义了一个process方法,该方法有一个参数event,通过该参数可获取zk框架回调process方法时传入的通知信息。
event对应的WatchedEvent类中提供了几个方法可以获取具体的信息,如:
event.getState()//获取事件状态
event.getType() //获取事件类型
event.getPath() //获取事件出发的znode路径
对于调用ZooKeeper类构造函数传入的Watcher实例,有两个作用。一是当zk客户端与服务器建立链接和断开链接时,该触发器会被触发。另外是作为缺省的触发器,当znode节点被设置了观察但没有设置具体的触发器时,如果该znode节点发生变化,则该缺省触发器会被调用。这个在后面章节会有具体介绍。
需要注意的是,通过构造函数创建ZooKeeper对象实例时,会启动一个单独的线程连接zk服务。因为调用ZooKeeper类的构造函数是立即返回的,因此,如果返回后立即使用ZooKeeper对象的方法,这时很可能zk对象还未与服务器建立连接(因为是在单独线程中去连接,肯定有时延),这样就会报错。也就是说,必须等该线程成功连接zk服务后,才能使用被创建的ZooKeeper对象。
那问题是我们怎么知道何时成功连接zk服务器呢?这时创建ZooKeeper类实例时调用的构造函数传入的Watcher对象就发挥作用了,一旦连接成功,zk就会回调Watcher对象中的process方法,这样我们在process方法中就可以判断是否连接成功。
因为调用ZooKeeper构造函数和process方法被执行是在两个线程中,需要利用信号量来同步两个操作。这时我们可以利用java.util.concurrent包中的CountDownLatch类。
CountDownLatch类可用于实现同步机制,CountDownLatch对象实例中有一个计数器,当该计数器的值大于0时,调用它的await方法就会堵塞,如果该计数器的值变为0时,await方法就会返回。这样我们可以创建一个全局的CountDownLatch实例,通过构造函数设置对象计数器的初始值设为1,如下面代码:
CountDownLatch connectedSemaphore= **new** CountDownLatch(1);
然后在创建ZooKeeper对象后调用CountDownLatch的await()方法,这样该语句会被堵塞。并在Watcher的process方法中检测是否有连接成功的通知,如果有,调用CountDownLatch类的countDown()方法让计数器减1变为0,这样前面的await()语句就由堵塞变成不堵塞返回。这时就可以使用被创建的ZooKeeper对象了。
最后我们不在使用ZooKeeper对象时,要调用ZooKeeper的close方法关闭与zk服务器的连接。
所以使用zk提供的api编写程序的一个基本模式是:
主程序中的伪代码:
{
设置CountDownLatch对象的计数器为1;
创建ZooKeeper对象;
调用CountDownLatch对象的await方法等到连接服务器成功;
执行zk的操作(如创建znode节点);
关闭连接;
}
Watcher接口的process方法中代码:
{
判断是否连接服务器成功;
如果是,设置CountDownLatch对象的计数器为0
}
下面我们来看具体的一些zk操作例子。
三、创建znode节点
创建znode节点可使用ZooKeeper类的create方法,完整的代码例子如下:
package com.zkexample;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class CreateZnode implements Watcher {
private ZooKeeper zk;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
CreateZnode createZnode = new CreateZnode();
createZnode.connect();
createZnode.action();
createZnode.close();
System.out.println("create znode success");
}
public void connect() throws IOException, InterruptedException{
zk = new ZooKeeper("localhost:2181",10000,this);
connectedSemaphore.await();
}
public void action() throws KeeperException, InterruptedException{
zk.create("/zoo", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public void close() throws InterruptedException{
zk.close();
}
@Override
public void process(WatchedEvent event) {
if(event.getState()==KeeperState.SyncConnected){
connectedSemaphore.countDown();
}
}
}
下面我们来对上面的代码进行解释(部分代码的含义在上一章中已经描述,这里不再重复解释)。
1、上面代码定义了一个普通的java类(类名为CreateZnode),实现了Watcher接口的process方法,Watcher接口的含义见上一章节中介绍。
2、CreateZnode 类中有三个方法:
1)connect方法用于创建ZooKeeper对象,并等待连接成功。
2)action方法中调用了ZooKeeper类中的create方法来创建znode节点。
3)close方法中调用了ZooKeeper类中的close方法来关闭zk客户端与zk服务器的连接。
3、ZooKeeper类中的create方法
create方法用于创建znode节点,有多个重载方法,上面例子中用到的方法定义如下:
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException;
该方法有4个参数:
1)第1个参数是待创建的znode的路径
2)第2个参数是znode存储的数据,本例中传入null
3)第3个参数是ACL列表,本例传入的常量表示该节点的权限不受控制。
4)第4个参数是创建的节点类型,本例传入的常量表示创建的是持久节点。
上面代码的工程只要引入适当的Jar包(见前面章节介绍),就可以直接在IDE中编译和运行。
四、删除znode节点
ZooKeeper类提供的delete方法可以删除指定的znode节点。delete方法定义如下:
void delete(String path, int version) throws InterruptedException,
KeeperException;
该方法有两个参数,第1个参数是待删除的znode的路径,第2个参数是版本号。
1)如果version参数指定的值与待删除的znode的dataVersion不一致,则无法删除,会抛异常。可以传入值-1,则不管版本号是多少都会删除。
2)如果待删除的znode不存在,则delete方法会抛异常。
3)如果待删除znode下有子节点,无法删除,会抛异常
例子代码如下(与上面例子相比,改变的只是action方法,因此下面只是给出了部分代码):
public void action() throws KeeperException, InterruptedException{
zk.delete("/zoo/hello", 0);
}
上面代码的含义是删除数据版本号为0的节点路径为”/zoo/hello”的znode。
如果该znode上设置了观察,则执行delete操作后,会触发观察。
五、设置znode存储的数据
ZooKeeper类提供的setData方法可以给znode设置要存储的数据。setData方法定义如下:
Stat setData(String path, byte[] data, int version) throws
KeeperException, InterruptedException;
该方法有3个参数,第1个参数是待设置的znode的路径,第2个参数是待设置的内容,第3个参数是版本号。
如果version参数指定的值与待设置的znode的dataVersion不一致,则无法设置,会抛异常。可以传入值-1,则不管版本号是多少都会被设置。
如果待设置znode不存在,会抛异常。
该方法有返回值,返回的是待设置节点的元数据信息。
例子代码如下(与上面例子相比,改变的只是action方法,因此下面只是给出了部分代码):
public void action() throws KeeperException, InterruptedException{
byte[] data = "adad".getBytes();
Stat stat = zk.setData("/zoo", data , -1);
}
如果该znode上设置了观察,则执行setData操作后,会触发观察。
六、测试znode是否存在
ZooKeeper类提供的exists方法可以检查指定的znode是否存在,如果不存在,返回null;如果存在返回封装有znode元数据的Stat对象。exists方法有两个重载的方法,分别定义如下:
public Stat exists(final String path, Watcher watcher)
throws KeeperException, InterruptedException;
上面方法的第1个参数是待检查znode的路径;第2个参数是设置观察触发器,如果该节点被创建、删除或其数据被修改,设置的观察将被触发。
public Stat exists(String path, boolean watch) throws
KeeperException,InterruptedException;
上面方法的第1个参数是待检查znode的路径;第2个参数是个布尔值,如果为true,则为该节点设置观察,触发器为默认的触发器(即调用构造函数创建ZooKeeper对象时传入的触发器)。
七、查看znode子节点
ZooKeeper类提供的getChildren方法可以获取指定znode下的子节点列表的名称(注意不是绝对路径)以及返回znode的状态。getChildren方法有多个重构的方法,分别定义如下(省略了异常抛出的签名):
List<String> getChildren(final String path, Watcher watcher);
List<String> getChildren(String path, boolean watch);
List<String> getChildren(final String path, Watcher watcher,Stat stat);
List<String> getChildren(String path, boolean watch, Stat stat);
1)path参数是待查看的znode的路径。
2)wathcer参数是为该znode设置观察触发器,如果该znode被删除,或znode有子节点被创建和删除,则该wathcer会被触发。
3)watch参数是一个布尔值,如果为true,则为该节点设置观察,触发器为默认的触发器(即调用构造函数创建ZooKeeper对象时传入的触发器)。
4)stat参数是用于返回该znode的元数据信息。
注意,如果传入的znode路径不存在,则getChildren方法会抛出异常。
例子代码如下(与上面例子相比,改变的只是action方法,因此下面只是给出了部分代码):
public void action() throws KeeperException, InterruptedException{
List<String> childern = zk.getChildren("/", false);
for(String path:childern){
System.out.println(path);
}
}
八、获取znode存储的数据
ZooKeeper类提供的getData方法可以获取指定znode存储的数据以及返回znode的状态信息。getDatashuju方法有多个重构的方法,分别定义如下(省略了异常抛出的签名):
byte[] getData(final String path, Watcher watcher, Stat stat);
byte[] getData(String path, boolean watch, Stat stat);
1)path参数是待获取的znode的路径。
2)wathcer参数是为该znode设置观察触发器,如果该znode被删除,或其数据被更新,则该wathcer会被触发。
3)watch参数是一个布尔值,如果为true,则为该节点设置观察,触发器为默认的触发器(即调用构造函数创建ZooKeeper对象时传入的触发器)。
4)stat参数是用于返回该znode的元数据信息。
注意,如果传入的znode路径不存在,则getData方法会抛出异常。
九、异步API
上面介绍的API都是同步的,即操作的返回结果通过API调用的返回值或参数直接返回。不过zk提供的API还提供了异步操作的API。因为所有异步操作的结果是通过回调函数来传送的,因此异步API方法的返回类型都是void,且没有异常抛出,错误信息通过回调函数的参数获取。
下面我们通过exists方法来举例说明,异步exists方法的定义如下:
void exists(String path, boolean watch, StatCallback cb, Object ctx);
该方法有4个参数,含义如下:
1)path参数,待操作的znode的路径
2)watch参数,表示在节点上是否设置观察。具体含义同前面的介绍
3)cb参数,回调函数,下面再详细介绍
4)ctx参数,调用exists方法额外传入的参数,可用于被在回调函数中区分不同请求。
异步方法传入的回调是StatCallback 接口的实现,StatCallback 接口的定义如下:
interface StatCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, Stat stat);
}
processResult就是被回调的方法,该方法有4个参数,含义如下:
1)rc参数表示zx操作出错的异常错误码,如果为0,表示操作成功,每个非0值,代表一种异常。
2)path参数,对应调用exists方法传入的参数,可用于识别这个回调所响应的请求。
3)ctx参数,也是对应调用exists方法传入的参数,可用于在path不足识别请求时额外传入的值,如果不需要,调用exists方法时可传入null。
4)stat参数,查询获取的结果,类似同步exists方法的返回值。
需要注意的是,调用异步方法时(如调用异步的exists方法),异步方法会立即返回。StatCallback 接口中的processResult方法被回调是在另外的线程中处理的。
下面看下例子代码:
public void action(){
zk.exists("/zoo", false, new StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println(stat.getCtime());
}
}, null);
}
十、小结
上面我们介绍了zk提供的JAVA api,用于编写zk客户端程序。
我们可以看出,zk客户端程序调用exists,getData,getChildren方法时可以为节点设置观察触发器。当该节点被zk客户端程序执行create,delete,setData方法时,节点上设置的触发器会被触发。
除了上述常见的API外,还有获取和设置znode的ACL(访问控制列表)信息,这个不在文本介绍。