zookeeper命令行操作
- 运行
zkCli.sh –server <ip>
进入命令行工具 - 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容:
ls /
- 创建一个新的 znode ,使用 create /zk myData 。这个命令创建了一个新的 znode 节点“ zk ”以及与它关联的字符串:
create /zk "myData"
- 我们运行 get 命令来确认 znode 是否包含我们所创建的字符串:
get /zk
- 监听这个节点的变化,当另外一个客户端改变/zk时,它会打出下面的
get /zk watch
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/zk
- 下面我们通过 set 命令来对 zk 所关联的字符串进行设置:
set /zk "zsl"
- 下面我们将刚才创建的 znode 删除:
delete /zk
- 删除节点:
rmr /zk
zookeeper-api应用
功能 | 描述 |
---|---|
create | 在本地目录树中创建一个节点 |
delete | 删除一个节点 |
exists | 测试本地是否存在目标节点 |
get/set data | 从目标节点上读取 / 写数据 |
get/set ACL | 获取 / 设置目标节点访问控制列表信息 |
get children | 检索一个子节点上的列表 |
sync | 等待要被传送的数据 |
demo代码
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
public class SimpleZkClient {
private static final String connectString = "192.168.127.61:2181,192.168.127.62:2181,192.168.127.63:2181";
private static final int sessionTimeout = 2000;
ZooKeeper zkClient = null;
@Before
public void init() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
System.out.println(event.getType() + "---" + event.getPath());
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
}
}
});
}
/**
* 数据的增删改查
*
* @throws InterruptedException
* @throws KeeperException
*/
// 创建数据节点到zk中
public void testCreate() throws KeeperException, InterruptedException {
// 参数1:要创建的节点的路径 参数2:节点大数据 参数3:节点的权限 参数4:节点的类型
String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//上传的数据可以是任何类型,但都要转成byte[]
}
//判断znode是否存在
@Test
public void testExist() throws Exception{
Stat stat = zkClient.exists("/eclipse", false);
System.out.println(stat==null?"not exist":"exist");
}
// 获取子节点
@Test
public void getChildren() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}
//获取znode的数据
@Test
public void getData() throws Exception {
byte[] data = zkClient.getData("/eclipse", false, null);
System.out.println(new String(data));
}
//删除znode
@Test
public void deleteZnode() throws Exception {
//参数2:指定要删除的版本,-1表示删除所有版本
zkClient.delete("/eclipse", -1);
}
//删除znode
@Test
public void setData() throws Exception {
zkClient.setData("/app1", "imissyou angelababy".getBytes(), -1);
byte[] data = zkClient.getData("/app1", false, null);
System.out.println(new String(data));
}
}
Zookeeper的监听器工作机制
- 监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑
- 监听器的注册是在获取数据的操作中实现:
-
getData(path,watch?)
监听的事件是:节点数据变化事件 -
getChildren(path,watch?)
监听的事件是:节点下的子节点增减变化事件
-
分布式共享锁的简单实现
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class DistributedClientLock {
// 会话超时
private static final int SESSION_TIMEOUT = 2000;
// zookeeper集群地址
private String hosts = "mini1:2181,mini2:2181,mini3:2181";
private String groupNode = "locks";
private String subNode = "sub";
private boolean haveLock = false;
private ZooKeeper zk;
// 记录自己创建的子节点路径
private volatile String thisPath;
/**
* 连接zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 判断事件类型,此处只处理子节点变化事件
if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
//获取子节点,并对父节点进行监听
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 去比较是否自己是最小id
Collections.sort(childrenNodes);
if (childrenNodes.indexOf(thisNode) == 0) {
//访问共享资源处理业务,并且在处理完成之后删除锁
doSomething();
//重新注册一把新的锁
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 1、程序一进来就先注册一把锁到zk上
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小会,便于观察
Thread.sleep(new Random().nextInt(1000));
// 从zk的锁父目录下,获取所有子节点,并且注册对父节点的监听
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
//如果争抢资源的程序就只有自己,则可以直接去访问共享资源
if (childrenNodes.size() == 1) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
/**
* 处理业务逻辑,并且在最后释放锁
*/
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
//释放锁
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
DistributedClientLock dl = new DistributedClientLock();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}