基于zookeeper分布式一致性锁

方案1:
算法思路:利用名称唯一性,加锁操作时,只需要所有客户端一起创建/test/Lock节点,只有一个创建成功,成功者获得锁。解锁时,只需删除/test/Lock节点,其余客户端再次进入竞争创建节点,直到所有客户端都获得锁。
特点:这种方案的正确性和可靠性是ZooKeeper机制保证的,实现简单。缺点是会产生“惊群”效应,假如许多客户端在等待一把锁,当锁释放时候所有客户端都被唤醒,仅仅有一个客户端得到锁。

《基于zookeeper分布式一致性锁》 image.png

方案2:
算法思路:临时顺序节点实现共享锁
客户端调用create()方法创建名为“locknode/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
客户端调用getChildren(“locknode”)方法来获取所有已经创建的子节点,同时在这个节点上注册上子节点变更通知的Watcher。
客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
如果在步骤3中发现自己并非是所有子节点中最小的,说明自己还没有获取到锁,就开始等待,直到下次子节点变更通知的时候,再进行子节点的获取,判断是否获取锁。
特点:适合小集群分布式。集群大会耗时严重。

方案3:
算法思路:临时顺序节点实现共享锁的改进实现
对于加锁操作,可以让所有客户端都去/lock目录下创建临时顺序节点,如果创建的客户端发现自身创建节点序列号是/lock/目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。
对于解锁操作,只需要将自身创建的节点删除即可。
特点:利用临时顺序节点来实现分布式锁机制其实就是一种按照创建顺序排队的实现。这种方案效率高,避免了“惊群”效应,多个客户端共同等待锁,当锁释放时只有一个客户端会被唤醒。

《基于zookeeper分布式一致性锁》 image.png

开源包menagerie :对方案3的一个封装,直接拿来用,貌似11年以后没更新过——》不建议使用。最新github:https://github.com/sfines/menagerie

重头戏来了,我们curator包已经封装好了,直接使用即可。————-》建议使用。具体curator实现原理+测试,后续再分析。

实现:

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);
    // 模拟只能单线程操作的资源
    public void use() throws InterruptedException
    {
        if (!inUse.compareAndSet(false, true))
        {
            // 在正确使用锁的情况下,此异常不可能抛出
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try
        {
            Thread.sleep((long) (3 * Math.random()));
        }
        finally
        {
            inUse.set(false);
        }
    }
}
public class ExampleClientThatLocks {
    private final InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;
    public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName)
    {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessMutex(client, lockPath);
    }
    public void doWork(long time, TimeUnit unit,int raceId) throws Exception
    {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        try
        {
            System.out.println(clientName + ":"+raceId+" 已获取到互斥锁");
            resource.use(); // 使用资源
            Thread.sleep(1000 * 1);
        }
        finally
        {
            System.out.println(clientName + ":"+raceId + " 释放互斥锁");
            lock.release(); // 总是在finally中释放
        }
    }
}
public class InterProcessMutexExample {
    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        final List<CuratorFramework> clientList = new ArrayList<CuratorFramework>();
        for (int i = 0; i < QTY; i++) {
            CuratorFramework client = CuratorFrameworkFactory.newClient("172.20.100.185:2181", new ExponentialBackoffRetry(1000, 3));
            client.start();
            clientList.add(client);
        }
        System.out.println("连接初始化完成!");
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        for (int i = 0; i < QTY; ++i) {
            final int index = i;
            Callable<Void> task = new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    try {
                        final ExampleClientThatLocks example = new ExampleClientThatLocks(clientList.get(index), PATH, resource, "Client " + index);
                        for (int j = 0; j < REPETITIONS; ++j) {
                            example.doWork(10, TimeUnit.SECONDS,j);
                        }
                    } catch (Throwable e) {
                        e.printStackTrace();
                    } finally {
                        CloseableUtils.closeQuietly(clientList.get(index));
                    }
                    return null;
                }
            };
            service.submit(task);
        }
        service.shutdown();
        service.awaitTermination(10, TimeUnit.MINUTES);
        System.out.println("OK!");
    }
}

基本思路:开启5个zk客户端,对应创建容量大小为5的线程池,线程池中,每个线程内执行执50次doWork,执行结果:如下

《基于zookeeper分布式一致性锁》 执行中zk中的对应五个客户端的锁

可以理解为,每个客户端对应一个分布式锁,也可以理解成唯一标志位,该表标志位主要有2部分组成:

1、路径—>zookeeper的通过树结构来存储零时值和持久值 ephemeral,Persistent 如 /root/a/b/c/**** 目录下的子节点可以通过getChildren获取,可以理解为分组的概念

2、_c_96010772-5fe8-4ae5-840e-0b3466e541f3-lock-0000000194 在该字符串中,其中有一部分为uuid,结尾部分分别有epoch和zxid组成??这个待考证,在文章
http://wely.iteye.com/blog/2362118中有解释

    原文作者:snail_knight
    原文地址: https://www.jianshu.com/p/b966e5838ac9
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞