前面文章讲解了用Redis实现分布式锁的方式:
这次我们来使用Zookeeper来实现分布式锁
核心逻辑
我们使用Zookeeper同名节点只能创建一次的特性,来实现独占锁,具体方法为:
try {
zk.create(lockNameSpace, "value".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
//如果节点已经被其他创建,捕获异常
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
所有客户端用create方法创建名称为lockNameSpace变量值的节点,如果创建成功则拿到锁,创建失败抛出NodeExists
异常:
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /mylock
at org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
at zookeeper.zookeeper.tttt.main(tttt.java:49)
说明获取锁失败。
具体实现
根据create方法的特性,我们来实现一个最简版的分布式锁
1.获取锁
public boolean lock() throws InterruptedException {
String path = null;
watchNode(nodeString);
while (true) {
try {
path = zk.create(nodeString, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException e) {
System.out.println(Thread.currentThread().getName() + "请求失败");
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
System.out.println("thread is notify");
}
}
if (path!=null&&!path.isEmpty()) {
System.out.println(Thread.currentThread().getName() + " getLock...");
return true;
}
}
}
代码逻辑:
path是create方法的返回值,用来判断是否创建成功。
while循环用来获取锁失败后不断重试
watchNode()方法用来监听nodeString的状态
watchNode:
private void watchNode(String nodeString) throws InterruptedException {
try {
zk.exists(nodeString, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("事件来了:" + watchedEvent.toString());
if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
System.out.println("delete事件");
}
}
});
} catch (KeeperException e) {
e.printStackTrace();
}
}
2.释放锁
public void unlock() {
try {
zk.delete(nodeString, -1);
System.out.println(Thread.currentThread().getName() + "release Lock...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
代码很简单,删除nodeString节点,以便其它客户端来竞争锁。
3.main方法
public static void main(String args[]) throws InterruptedException {
Zookeeper_1_While test = new Zookeeper_1_While();
try {
Thread.sleep(100);
zk.create(lockNameSpace, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 4; i++) {
service.execute(() -> {
try {
test.lock();
Thread.sleep(1800);
} catch (InterruptedException e) {
e.printStackTrace();
}
test.unlock();
});
}
service.shutdown();
}
创建四个客户端来竞争锁,竞争到以后执行1.8s的业务逻辑,然后释放锁。
4.输出结果
Receive event WatchedEvent state:SyncConnected type:None path:null
connection is ok
事件来了:WatchedEvent state:SyncConnected type:NodeCreated path:/mylock/lock1
事件来了:WatchedEvent state:SyncConnected type:NodeCreated path:/mylock/lock1
事件来了:WatchedEvent state:SyncConnected type:NodeCreated path:/mylock/lock1
事件来了:WatchedEvent state:SyncConnected type:NodeCreated path:/mylock/lock1
pool-1-thread-1 拿到 Lock...
pool-1-thread-2请求失败
pool-1-thread-3请求失败
pool-1-thread-4请求失败
pool-1-thread-2请求失败
pool-1-thread-4请求失败
pool-1-thread-3请求失败
pool-1-thread-2请求失败
pool-1-thread-4请求失败
pool-1-thread-3请求失败
pool-1-thread-2请求失败
pool-1-thread-4请求失败
pool-1-thread-3请求失败
pool-1-thread-1释放 Lock...
pool-1-thread-2 拿到 Lock...
pool-1-thread-4请求失败
pool-1-thread-3请求失败
pool-1-thread-4请求失败
pool-1-thread-3请求失败
pool-1-thread-4请求失败
pool-1-thread-3请求失败
pool-1-thread-4请求失败
pool-1-thread-3请求失败
pool-1-thread-2释放 Lock...
pool-1-thread-4 拿到 Lock...
pool-1-thread-3请求失败
pool-1-thread-3请求失败
pool-1-thread-3请求失败
pool-1-thread-3请求失败
pool-1-thread-4释放 Lock...
pool-1-thread-3 拿到 Lock...
pool-1-thread-3释放 Lock...
存在的问题
通过打印的输出结果我们可以看到,同时竞争锁的客户端越多,请求失败的输出越频繁,也就是while循环去请求create方法越频繁,这会导致两个问题
1.while循环浪费cpu
2.频繁进行create重试会对Zookeeper服务器造成压力
githup代码:https://github.com/granett/zookeeper/blob/master/src/main/java/zookeeper/zookeeper/Zookeeper_1_While.java
针对以上两个问题,我们后续在下一篇Zookeeper实现分布式锁(二)Watcher版进行解决和优化。