简介
我相信很多人学分布式锁最大的动力并不是他自己的系统需要,而是面试官需要。。。当然,这也侧面说明分布锁很重要,经常作为考题,在学习之前,我们要先明确几个问题。
一、锁重要吗?
当然重要,只要访问临界资源的时候,都会用到锁,要不然就会有线程安全问题。
二、那我们为什么不用Java自带的锁?比如synchronized和Lock还要自己实现呢?
这里需要的明确一个问题,这些Java自带的锁都是在同一个JVM中发挥作用的,如果是在分布式的服务中,会有多个JVM虚拟机下的服务进行并发访问,这些锁是发挥不了作用的。分布式环境下,锁需要第三方服务提供。
三、常用的分布式锁实现方案有哪些?
- 基于MySQL的分布式锁
- 基于Redis的分布式锁
- 基于ZooKeeper的分布式锁
其实说白了,只要能存储数据的数据库都能实现分布锁,因为我们只需要告诉其它线程,当前资源被占用了就可以了,其实synchronized和Lock也是存储一个标记,告知别的线程,当前资源有没有被占用而已,没什么神秘的。
三种实现方式
首先我们需要明白,分布式锁应该达到什么样的要求:
- 互斥性。(在分布式集群中,同一个方法在同一时间只能被一台机器上的一个线程获得)。
- 可重入性。(递归调用不应该被阻塞、避免死锁)。
- 锁的超时。(避免死锁、死循环等意外情况)。
- 加锁和解锁必须为同一客户端。(除非锁到期自动收回,否则加锁和解锁需为同一客户端)。
接下来简单介绍下三种锁实现原理,以及他们的优缺点。
基于MySQL的分布式锁
基于MySQL的锁主要是两种实现方式,一种是悲观锁,一种是乐观锁。
- 悲观锁:主要利用 select … where … for update 对所查行进行锁定和操作,需要注意的是“where name = lock ”,name字段必须要走索引,否则会锁表。
- 乐观锁:是基于CAS的思想,不认为锁争抢经常发生,只有update version失败后才能觉察到,锁争抢不多时,是很好的解决方案,但争抢过多,很浪费CPU资源。
优点:
- 实现相对简单,都由MySql去解决争抢问题。
- 架构相对简单,不再需要多余的三方组件,使整个系统更简单。
缺点:
- 性能相对较差,并且有锁表的风险。
- 非阻塞操作失败后,需要轮询,占用cpu资源,和MySQL数据库资源。
- 长时间不提交或者长时间轮询,会占用过多MySQL连接资源。
基于Redis的分布式锁
Redis的一些操作和特性,可能参考这篇文章:Redis学习笔记。这里直接介绍需要用到的三个命令,分别是:SETNX、expire、delete。
- SETNX key val
SETNX为SET if Not eXists缩写,作用为:若key不存在,则存入键值对,返回1;若key存在,则什么都不做,返回0。 - expire key <timeout>
为key设置一个超时时间,单位为second,超过这个时间锁会自动删除key,避免因宕机引起死锁。 - delete key
删除指定key。
实现思想为:
- 获取锁的时候,使用setnx加锁,key为锁名,value值为一个随机生成的UUID。
- 获取锁成功后,使用expire命令为锁添加一个超时时间,若超过这个时间则放弃获取锁。
- 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
优点:
- 依赖于Redis高并发的特性,所以性能极佳。
- 过期时间不好控制,需要考虑续锁问题。
缺点:
- 实现相对复杂,需要考虑的因素太多。
- 非阻塞,操作失败后,需要轮询,占用cpu资源。
- 主节点挂掉,未成功同步的情况下,可能导致多个节点获取到锁。
基于ZooKeeper的分布式锁
ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。Znode 又分为四种类型:持久节点、持久顺序节点、临时节点、临时顺序节点。
ZooKeeper 分布式锁是基于临时顺序节点来实现的,锁可理解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。
而且用临时顺序节点的另外一个用意是,如果某个客户端创建临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个客户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。
优点:
- 有效的解决单点宕机问题,具备高可用性。
- 可做可重入锁。
- 可做阻塞锁。
缺点:
- 需要频繁的创建和删除节点,性能上不如Redis。
- 客户端与 Zookeeper 的长时间失联,锁被释放问题。
总结
从实现容易度上比较:MySQL数据库 > Zookeeper > Redis缓存。
从性能上比较:Redis缓存 > Zookeeper > MySQL数据库。
从可靠性上比较:Zookeeper > MySQL数据库 > Redis缓存。
Redis的具体实现代码
用Redis做代码实现时,我们需要考虑以下几个问题:
- 因为宕机引起的死锁问题:设置过期时间。
- 业务时间长于过期时间:开一个守护进程,做锁续期。
- 锁被别人释放:锁写入唯一标识,释放锁先检查标识,再释放。
那么这些实现起来太麻烦怎么办,直接用别人封装好的库即可:Redisson。不但更加方便,也更加稳定,代码如下:
// 1.构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("password").setDatabase(0);
// 2.构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 3.获取锁对象实例
RLock rLock = redissonClient.getLock(lockKey);
try {
// 4.尝试获取锁
boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
if (res) {
// 获得锁成功,处理业务
}
} catch (Exception e) {
// 继续等待,或执行别的操作
throw new RuntimeException("aquire lock fail");
}finally{
// 无论如何, 最后都要解锁
rLock.unlock();
}
Redisson的实现方法如下:
tryLock实现方法如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
/**
* 订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
* 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
* 当 this.await 返回 true,进入循环尝试获取锁.
*/
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future)
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
// 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
/**
* 收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
* 获取锁成功,则立马返回 true,
* 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
*/
while (true) {
long currentTime = System.currentTimeMillis();
// 再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 超过最大等待时间则返回 false 结束循环,获取锁失败
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
/**
* 阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):
*/
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
//如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
//则就在wait time 时间范围内等待可以通过信号量
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
// 无论是否获得锁,都要取消订阅解锁消息
unsubscribe(subscribeFuture, threadId);
}
return get(tryLockAsync(waitTime, leaseTime, unit));
}
Redisson看门狗续锁机制
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 如果带有过期时间,则按照普通方式获取锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 先按照30秒的过期时间来执行获取锁的方法
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
续期原理
就是用lua脚本,将锁的时间重置为30s
/*
Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,
然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 还持有锁 key
(判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),
那么就会不断的延长锁 key 的生存时间。如果服务宕机了,Watch Dog 机制线程也就没有了,
此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。
*/
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}