如何用Redis实现分布式锁?

简介

  我相信很多人学分布式锁最大的动力并不是他自己的系统需要,而是面试官需要。。。当然,这也侧面说明分布锁很重要,经常作为考题,在学习之前,我们要先明确几个问题。

一、锁重要吗?

  当然重要,只要访问临界资源的时候,都会用到锁,要不然就会有线程安全问题。

二、那我们为什么不用Java自带的锁?比如synchronized和Lock还要自己实现呢?

  这里需要的明确一个问题,这些Java自带的锁都是在同一个JVM中发挥作用的,如果是在分布式的服务中,会有多个JVM虚拟机下的服务进行并发访问,这些锁是发挥不了作用的。分布式环境下,锁需要第三方服务提供。

三、常用的分布式锁实现方案有哪些?
  • 基于MySQL的分布式锁
  • 基于Redis的分布式锁
  • 基于ZooKeeper的分布式锁

  其实说白了,只要能存储数据的数据库都能实现分布锁,因为我们只需要告诉其它线程,当前资源被占用了就可以了,其实synchronized和Lock也是存储一个标记,告知别的线程,当前资源有没有被占用而已,没什么神秘的。

三种实现方式

  首先我们需要明白,分布式锁应该达到什么样的要求:

  1. 互斥性。(在分布式集群中,同一个方法在同一时间只能被一台机器上的一个线程获得)。
  2. 可重入性。(递归调用不应该被阻塞、避免死锁)。
  3. 锁的超时。(避免死锁、死循环等意外情况)。
  4. 加锁和解锁必须为同一客户端。(除非锁到期自动收回,否则加锁和解锁需为同一客户端)。

接下来简单介绍下三种锁实现原理,以及他们的优缺点。

基于MySQL的分布式锁

  基于MySQL的锁主要是两种实现方式,一种是悲观锁,一种是乐观锁

  1. 悲观锁:主要利用 select … where … for update 对所查行进行锁定和操作,需要注意的是“where name = lock ”,name字段必须要走索引,否则会锁表。
  2. 乐观锁:是基于CAS的思想,不认为锁争抢经常发生,只有update version失败后才能觉察到,锁争抢不多时,是很好的解决方案,但争抢过多,很浪费CPU资源。
优点:
  • 实现相对简单,都由MySql去解决争抢问题。
  • 架构相对简单,不再需要多余的三方组件,使整个系统更简单。
缺点:
  • 性能相对较差,并且有锁表的风险。
  • 非阻塞操作失败后,需要轮询,占用cpu资源,和MySQL数据库资源。
  • 长时间不提交或者长时间轮询,会占用过多MySQL连接资源。

基于Redis的分布式锁

  Redis的一些操作和特性,可能参考这篇文章:Redis学习笔记。这里直接介绍需要用到的三个命令,分别是:SETNXexpiredelete

  1. SETNX key val
    SETNXSET if Not eXists缩写,作用为:若key不存在,则存入键值对,返回1;若key存在,则什么都不做,返回0。
  2. expire key <timeout>
    为key设置一个超时时间,单位为second,超过这个时间锁会自动删除key,避免因宕机引起死锁。
  3. delete key
    删除指定key。

实现思想为:

  1. 获取锁的时候,使用setnx加锁,key为锁名,value值为一个随机生成的UUID。
  2. 获取锁成功后,使用expire命令为锁添加一个超时时间,若超过这个时间则放弃获取锁。
  3. 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
优点:
  • 依赖于Redis高并发的特性,所以性能极佳。
  • 过期时间不好控制,需要考虑续锁问题。
缺点:
  • 实现相对复杂,需要考虑的因素太多。
  • 非阻塞,操作失败后,需要轮询,占用cpu资源。
  • 主节点挂掉,未成功同步的情况下,可能导致多个节点获取到锁。

基于ZooKeeper的分布式锁

  ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。Znode 又分为四种类型:持久节点持久顺序节点临时节点临时顺序节点
  ZooKeeper 分布式锁是基于临时顺序节点来实现的,锁可理解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。
  而且用临时顺序节点的另外一个用意是,如果某个客户端创建临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个客户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。

优点:
  • 有效的解决单点宕机问题,具备高可用性。
  • 可做可重入锁。
  • 可做阻塞锁。
缺点:
  • 需要频繁的创建和删除节点,性能上不如Redis。
  • 客户端与 Zookeeper 的长时间失联,锁被释放问题。

总结

从实现容易度上比较:MySQL数据库 > Zookeeper > Redis缓存。
从性能上比较:Redis缓存 > Zookeeper > MySQL数据库。
从可靠性上比较:Zookeeper > MySQL数据库 > Redis缓存。

Redis的具体实现代码

  用Redis做代码实现时,我们需要考虑以下几个问题:

  • 因为宕机引起的死锁问题:设置过期时间。
  • 业务时间长于过期时间:开一个守护进程,做锁续期。
  • 锁被别人释放:锁写入唯一标识,释放锁先检查标识,再释放。

那么这些实现起来太麻烦怎么办,直接用别人封装好的库即可:Redisson。不但更加方便,也更加稳定,代码如下:

// 1.构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("password").setDatabase(0);

// 2.构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);

// 3.获取锁对象实例
RLock rLock = redissonClient.getLock(lockKey);
try {
   
    // 4.尝试获取锁
    boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
    if (res) {
        // 获得锁成功,处理业务
    }
} catch (Exception e) {
    // 继续等待,或执行别的操作
    throw new RuntimeException("aquire lock fail");
}finally{
    // 无论如何, 最后都要解锁
    rLock.unlock();
}

Redisson的实现方法如下:
tryLock实现方法如下:

	public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        
        // 尝试获取锁
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }

        // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        current = System.currentTimeMillis();

        /**
         * 订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
         * 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
         * 当 this.await 返回 true,进入循环尝试获取锁.
         */
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future)
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            // 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
              }

            /**
             * 收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
             * 获取锁成功,则立马返回 true,
             * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
             */
            while (true) {
                long currentTime = System.currentTimeMillis();

                // 再次尝试获取锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
                // 超过最大等待时间则返回 false 结束循环,获取锁失败
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                /**
                 * 阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):
                 */
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    //如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    //则就在wait time 时间范围内等待可以通过信号量
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                // 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 无论是否获得锁,都要取消订阅解锁消息
            unsubscribe(subscribeFuture, threadId);
        }
        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
    

Redisson看门狗续锁机制

	
	private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
		
		// 如果带有过期时间,则按照普通方式获取锁
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }

        // 先按照30秒的过期时间来执行获取锁的方法
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        
        // 如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

续期原理
就是用lua脚本,将锁的时间重置为30s


/*
 Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,
 然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 还持有锁 key
 (判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),
 那么就会不断的延长锁 key 的生存时间。如果服务宕机了,Watch Dog 机制线程也就没有了,
 此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。
*/
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()),
        internalLockLeaseTime, getLockName(threadId));
}
    原文作者:小夏陌
    原文地址: https://blog.csdn.net/qq_22136439/article/details/124494154
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞