Zookeeper实现分布式锁(二)Watcher版

在Zookeeper实现分布式锁的第一篇文章Zookeeper实现分布式锁(一)While版中,我们使用Zookeeper实现了一个分布式锁,过程原理如下:

1.所有客户端都使用create方法创建同一个名字的节点
2.创建成功的获取到锁,其余的客户端不断while循环重试
3.获取到锁的客户端执行完逻辑代码后delete节点,其余客户端继续争抢

这种方式会存在一下问题:

1.while循环浪费cpu
2.频繁进行create重试会对Zookeeper服务器造成压力

针对以上问题我们可以通过Zookeeper的watcher来解决,首先我们来看watcher的使用。

Watcher使用

在zookeeper中,watcher机制是当服务端的节点信息发生了变化时通知所有监听了该节点的客户端。其允许客户端向服务端注册一个watcher监听,当服务端的一些指定事件触发了这个watcher,就会向指定的客户端发送一个事件通知。

public static void watch() throws Exception{
        zk.create("/zx3", "aaa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.exists("/zx3", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                try {
                    System.out.println("监听的节点有变化,触发事件");
                    System.out.println(watchedEvent.getPath());
                    System.out.println(watchedEvent.getState());
                    System.out.println(watchedEvent.getType());
                } catch (Exception e) {
                    // TODO: handle exception
                }
                
            }
        });
        //通过删除/zx3节点来触发watcher事件
        zk.delete("/zx3", -1);
}

输出结果

监听的节点有变化,触发事件
/zx3
SyncConnected
NodeDeleted

通过watcher的这种特性我们可以避免while循环不断重试的弊端。

Watcher锁实现

根据watcher可以监听某个节点的变化这个特点,可以用来优化我们的while锁。

1.实现原理

1.所有客户端都使用create方法创建同一个名字的节点
2.创建成功的获取到锁,失败的客户端对该节点设置一个watcher
3.获取到锁的客户端delete节点,其余客户端收到watcher的通知,此时再执行一次create方法
4.1-3进行循环

上述方式避免了获取不到锁的客户端不断循环重试的风险,而是当有人释放锁的时候,通知剩下的所有人来尝试获取锁。

2.获取方法
public boolean lock() throws Exception {
        String path = null;
        while (true) {
            CountDownLatch countDownLatch =  new CountDownLatch(1);
            try {
                path = zk.create(nodeString, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            } catch (KeeperException e) {
                System.out.println(Thread.currentThread().getName() + "  请求失败");
                
                try {
                    zk.exists(nodeString, new Watcher() {
                        @Override
                        public void process(WatchedEvent watchedEvent) {
                            System.out.println("事件来了:" + watchedEvent.toString());
                            if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
                                System.out.println("delete事件");
                                countDownLatch.countDown();
                            }
                        }
                    });
                } catch (KeeperException e1) {
                    e.printStackTrace();
                }
            }
            if (path != null && !path.isEmpty()) {
                System.out.println(Thread.currentThread().getName() + " 拿到 Lock...");
                break;
            }
            countDownLatch.await();
        }
        return true;
        
}

上述代码还是使用了while循环?
代码逻辑:

1.首先尝试create节点获取锁,如果拿锁失败,就通过watcher监听nodeString节点
2.在while循环最后使用countDownLatch.await()阻止继续循环
3.直到watcher收到释放锁的通知才countDownLatch.countDown()解开阻塞,重新执行create获取锁
4.重复1-3

3.释放锁
public void unlock() {
        try {
            zk.delete(nodeString, -1);
            System.out.println(Thread.currentThread().getName() +  "释放 Lock...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
}
4.main方法
public static void main(String args[]) throws InterruptedException {
        ZookeeperLock_2_Watch test = new ZookeeperLock_2_Watch();
        try {
            Thread.sleep(100);
            zk.create(lockNameSpace, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
        } catch (InterruptedException e) {
        }
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 4; i++) {
            service.execute(() -> {
                try {
                    test.lock();
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                test.unlock();
            });
        }
        service.shutdown();
    }
5.输出结果
Receive event WatchedEvent state:SyncConnected type:None path:null
connection is ok
pool-1-thread-2 拿到 Lock...
pool-1-thread-4  请求失败
pool-1-thread-1  请求失败
pool-1-thread-3  请求失败
事件来了:WatchedEvent state:SyncConnected type:NodeDeleted path:/mylock/lock1
delete事件
事件来了:WatchedEvent state:SyncConnected type:NodeDeleted path:/mylock/lock1
delete事件
事件来了:WatchedEvent state:SyncConnected type:NodeDeleted path:/mylock/lock1
delete事件
pool-1-thread-2释放 Lock...
pool-1-thread-1 拿到 Lock...
pool-1-thread-4  请求失败
pool-1-thread-3  请求失败
事件来了:WatchedEvent state:SyncConnected type:NodeDeleted path:/mylock/lock1
delete事件
事件来了:WatchedEvent state:SyncConnected type:NodeDeleted path:/mylock/lock1
delete事件
pool-1-thread-1释放 Lock...
pool-1-thread-4 拿到 Lock...
pool-1-thread-3  请求失败
事件来了:WatchedEvent state:SyncConnected type:NodeDeleted path:/mylock/lock1
delete事件
pool-1-thread-4释放 Lock...
pool-1-thread-3 拿到 Lock...
pool-1-thread-3释放 Lock...

通过输出结果可以看到,只要有一个客户端拿到锁,其余的客户端都不会再重复请求,减少了cpu和zookeeper的压力,当释放锁时,zookeeper会通知所有监听的客户端,客户端再执行create来竞争锁,完美解决了while版本的问题。

存在的问题

通过Watcher实现的锁虽然解决了While的无限循环问题,但还是存在一个比较棘手的麻烦:

当有极多的客户端在watcher等待锁时,一旦持有锁的客户端释放,就会引起“惊群效应”,无数个客户端的请求压力直接打到zookeeper服务器上

githup代码:https://github.com/granett/zookeeper/blob/master/src/main/java/zookeeper/zookeeper/Zookeeper_2_Watcher.java

    原文作者:激情的狼王
    原文地址: https://www.jianshu.com/p/2958646df1d8
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞