转载:http://www.cnblogs.com/0201zcr/p/5942748.html
转载: http://blog.csdn.net/fengshizty/article/details/53561562
转载:http://www.hollischuang.com/archives/1716
转载:http://www.cnblogs.com/zhongkaiuu/p/redisson.html
转载:https://yq.aliyun.com/articles/60663
转载:http://blog.csdn.net/josn_hao/article/details/78412694
一 分布式锁
由于在平时的工作中,线上服务器是分布式多台部署的,经常会面临解决分布式场景下数据一致性的问题,那么就要利用分布式锁来解决这些问题。
分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。
1. 乐观锁和悲观锁
悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁
2. 分布式锁的一般实现方式
针对分布式锁的实现,目前比较常用的有以下几种方案:
- 基于数据库实现分布式锁
- 基于缓存(redis,memcached,tair)实现分布式锁
- 基于Zookeeper实现分布式锁
1 基于数据库实现分布式锁
基于数据库实现的分布式锁分为行锁(for update实现的record lock)和排他锁/表锁(添加唯一标识子段)的方式实现。
实现原理
要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。创建这样一张数据库表:
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
当我们想要锁住某个方法时,执行以下SQL:
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:
delete from methodLock where method_name ='method_name'
除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。可以通过数据库的排他锁来实现分布式锁。 基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作:、
public boolean lock(){
connection.setAutoCommit(false)
while(true){
try{
result = select * from methodLock where method_name=xxx for update;
if(result==null){
return true;
}
}catch(Exception e){
}
sleep(1000);
}
return false;
}
在查询语句后面增加for update,数据库会在查询过程中给数据库表增加
排他锁(这里再多提一句,InnoDB引擎在加锁的时候,只有
通过索引进行检索的时候才会使用行级锁,否则会使用
表级锁。这里我们希望使用行级锁,就要给method_name添加索引,值得注意的是,这个索引一定要创建成
唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上。)当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。
执行完方法之后,再通过以下方法解锁:
public void unlock(){
connection.commit();
}
优点
直接借助数据库,容易理解。
缺点
会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。
操作数据库需要一定的开销,性能问题需要考虑。(sql超时异常的问题{框架层的事务超时/jdbc的查询超时/Socket的读超时})
使用数据库的行级锁并不一定靠谱,尤其是当我们的锁表并不大的时候。
2 基于缓存实现分布式锁
memcached锁:
实现原理
memcached带有add函数,利用add函数的特性即可实现分布式锁。add和set的区别在于:如果多线程并发set,则每个set都会成功,但最后存储的值以最后的set的线程为准。而add的话则相反,add会添加第一个到达的值,并返回true,后续的添加则都会返回false。利用该点即可很轻松地实现分布式锁。
优点
并发高效。
缺点
(1)memcached采用列入LRU置换策略,所以如果内存不够,可能导致缓存中的锁信息丢失。
(2)memcached无法持久化,一旦重启,将导致信息丢失。
(3)通过超时时间来控制锁的失效时间并不是十分的靠谱。
3 基于zookeeper实现分布式锁
实现原理:
基于zookeeper瞬时有序节点实现的分布式锁,大致思想即为:每个客户端对某个功能加锁时,在zookeeper上的与该功能对应的指定节点的目录下,生成一个唯一的瞬时有序节点。判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
优点
锁安全性高,zk可持久化
缺点
性能开销比较高。因为其需要动态产生、销毁瞬时节点来实现锁功能。
实现
可以直接采用zookeeper第三方库curator即可方便地实现分布式锁。以下为基于curator实现的zk分布式锁核心代码:
3. 分布式锁的适用场景举例
场景一: 比如分配任务场景。在这个场景中,由于是公司的业务后台系统,主要是用于审核人员的审核工作,并发量并不是很高,而且任务的分配规则设计成了通过审核人员每次主动的请求拉取,然后服务端从任务池中随机的选取任务进行分配。这个场景看到这里你会觉得比较单一,但是实际的分配过程中,由于涉及到了按用户聚类的问题,所以要比我描述的复杂,但是这里为了说明问题,大家可以把问题简单化理解。那么在使用过程中,主要是为了避免同一个任务同时被两个审核人员获取到的问题。我最终使用了基于数据库资源表的分布式锁来解决的问题。
场景二: 比如支付场景。在这个场景中,我提供给用户三个用于保护用户隐私的手机号码(这些号码是从运营商处获取的,和真实手机号码看起来是一样的),让用户选择其中一个进行购买,用户购买付款后,我需要将用户选择的号码分配给用户使用,同时也要将没有选择的释放掉。在这个过程中,给用户筛选的号码要在一定时间内(用户筛选正常时间范围内)让当前用户对这个产品具有独占性,以便保证付款后是100%可以拿到;同时由于产品资源池的资源有限,还要保持资源的流动性,即不能让资源长时间被某个用户占用着。对于服务的设计目标,一期项目上线的时候至少能够支持峰值qps为300的请求,同时在设计的过程中要考虑到用户体验的问题。我最终使用了memecahed的add()方法和基于数据库资源表的分布式锁来解决的问题。
场景三: 我有一个数据服务,每天调用量在3亿,每天按86400秒计算的qps在4000左右,由于服务的白天调用量要明显高于晚上,所以白天下午的峰值qps达到6000的,一共有4台服务器,单台qps要能达到3000以上。我最终使用了redis的setnx()和expire()的分布式锁解决的问题。
场景四:场景一和场景二的升级版。在这个场景中,不涉及支付。但是由于资源分配一次过程中,需要保持涉及一致性的地方增加,而且一期的设计目标要达到峰值qps500,所以需要我们对场景进一步的优化。我最终使用了redis的setnx()、expire()和基于数据库表的分布式锁来解决的问题。
转载:http://www.cnblogs.com/PurpleDream/p/5559352.html
二 Redis分布式锁的实现原理:setnx/getset
1)setNX(SET if Not eXists)
语法:SETNX key value
SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写,其操作为:将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不做任何动作。
返回值:
设置成功,返回 1 。
设置失败,返回 0 。
所以我们使用执行下面的命令:
SETNX lock.foo <current Unix time + lock timeout + 1>
如返回1,则该客户端获得锁,把lock.foo的键值设置为时间值表示该键已被锁定,该客户端最后可以通过DEL lock.foo来释放该锁。
如返回0,表明该锁已被其他客户端取得,这时我们可以先返回或进行重试等对方完成或等待锁超时。
2)getSET
语法:GETSET key value
将给定 key 的值设为 value ,并返回 key 的旧值(old value)。当 key 存在但不是字符串类型时,返回一个错误。
返回值:
返回给定 key 的旧值。
当 key 没有旧值时,也即是, key 不存在时,返回 nil 。
3)get
语法:GET key
返回值:
当 key 不存在时,返回 nil ,否则,返回 key 的值。
如果 key 不是字符串类型,那么返回一个错误
三 Redis分布式锁基本方式
redis通常可以使用setnx来实现分布式锁。
1. 获取锁
public static void lock(Jedis jedis, String lockKey, String requestId, int expireTime) {
Long result = jedis.setnx(lockKey, requestId);
if (result == 1) {
// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
jedis.expire(lockKey, expireTime);
}
}
2. 释放锁
public static void releaselock1(Jedis jedis, String lockKey) {
jedis.del(lockKey);
}
setnx来创建一个key,如果key不存在则创建成功返回1,如果key已经存在则返回0。依照上述来判定是否获取到了锁。获取到锁的执行业务逻辑,完毕后删除lock_key,来实现释放锁,其他未获取到锁的则进行不断重试,直到自己获取到了锁。
上述逻辑在正常情况下是OK的,但是一旦获取到锁的客户端挂了,没有执行上述释放锁的操作,则其他客户端就无法获取到锁了。
四 Redis分布式锁的实现
简单方式实现的分布式锁在客户端掉线时无法释放资源,所以在这种情况下有2种方式来解决:
- 为lock_key设置一个过期时间
- 对lock_key的value进行判断是否过期
以第一种为例,在set键值的时候带上过期时间,即使挂了,也会在过期时间之后,其他客户端能够重新竞争获取锁。
public class RedisTool {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
可以看到,我们加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time),这个set()方法一共有五个形参:
第一个为key,我们使用key来当锁,因为key是唯一的。
第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用
UUID.randomUUID().toString()方法生成。
第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
第五个为time,与第四个参数相呼应,代表key的过期时间。
总的来说,执行上面的set()方法就只会导致两种结果:1. 当前没有锁(key不存在),那么就进行
加锁操作,并对锁设置个
有效期,同时value表示加锁的客户端。2. 已有锁存在,不做任何操作。
以第二种为例,一旦发现lock_key的值已经小于当前时间了,说明该key过期了,然后对该key进行getset设置,一旦getset返回值是原来的过期值,说明当前客户端是第一个来操作的,代表获取到了锁,一旦getset返回值不是原来过期时间则说明前面已经有人修改了,则代表没有获取到锁,详细见用Redis实现分布式锁,改正如下:
# get lock
lock = 0
while lock != 1:
timestamp = current_unix_time + lock_timeout
lock = SETNX lock.foo timestamp
if lock == 1 or (now() > (GET lock.foo) and now() > (GETSET lock.foo timestamp)):
break;
else:
sleep(10ms)
# do your job
do_job()
# release
if now() < GET lock.foo:
DEL lock.foo
lock timeout的存在也使得失去了锁的意义,即存在并发的现象。一旦出现锁的租约时间,就意味着获取到锁的客户端
必须在租约之内执行完毕业务逻辑,一旦业务逻辑执行时间过长,租约到期,就会引发
并发问题。所以有lock timeout的可靠性并不是那么的高。
对于请求锁的客户端而言,如何才能知道锁被释放了呢?实现方式一般有2种情况:
1 没有获取到锁的客户端不断尝试获取锁
2 服务器端通知客户端锁被释放了
当然第二种情况是最优的(客户端所做的无用功最少),如ZooKeeper通过注册watcher来得到锁释放的通知。而数据库、redis没有办法来通知客户端锁释放了,那客户端就只能傻傻的不断尝试获取锁了。
锁的删除;
public class RedisTool {
private static final Long RELEASE_SUCCESS = 1L;
/**
* 释放分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
第一行代码,我们写了一个简单的Lua脚本代码。
首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。第二行代码,我们将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为
lockKey,ARGV[1]赋值为
requestId。eval()方法是将Lua代码交给Redis服务端执行。
五 RedLock
RedLock是Redis官方给出的分布式锁原理。官方文档描述:”有很多三方库和文章描述如何用Redis实现一个分布式锁管理器,但是这些库实现的方式差别很大,而且很多简单的实现其实只需采用稍微增加一点复杂的设计就可以获得更好的可靠性。 这篇文章的目的就是尝试提出一种官方权威的用Redis实现分布式锁管理器的算法,我们把这个算法称为RedLock,我们相信这个算法会比一般的普通方法更加安全可靠。“
为什么基于故障切换的方案不够好
为了理解我们想要提高的到底是什么,我们先看下当前大多数基于Redis的分布式锁三方库的现状。 用Redis来实现分布式锁最简单的方式就是在实例里创建一个键值,创建出来的键值一般都是有一个超时时间的(这个是Redis自带的超时特性),所以每个锁最终都会释放。而当一个客户端想要释放锁时,它只需要删除这个键值即可。 表面来看,这个方法似乎很管用,但是这里存在一个问题:
在我们的系统架构里存在一个单点故障,如果Redis的master节点宕机了怎么办呢?
有人可能会说:加一个slave节点!在master宕机时用slave就行了!但是其实这个方案明显是不可行的,因为这种方案无法保证第1个安全互斥属性,因为Redis的复制是异步的。 总的来说,这个方案里有一个明显的竞争条件(race condition),举例来说:
- 客户端A在master节点拿到了锁。
- master节点在把A创建的key写入slave之前宕机了。
- slave变成了master节点
- B也得到了和A还持有的相同的锁(因为原来的slave里还没有A持有锁的信息)
当然,在某些特殊场景下,前面提到的这个方案则完全没有问题,比如
在宕机期间,多个客户端允许同时都持有锁,如果你可以容忍这个问题的话,那用这个基于复制的方案就完全没有问题,否则的话我们还是建议你采用这篇文章里接下来要描述的方案。
Redlock算法
在分布式版本的算法里我们假设我们有N个Redis master节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调算法。我们已经描述了如何在单节点环境下安全地获取和释放锁。因此我们理所当然地应当用这个方法在每个单节点里来获取和释放锁。在我们的例子里面我们把N设成5,这个数字是一个相对比较合理的数值,因此我们需要在不同的计算机或者虚拟机上运行5个master节点来保证他们大多数情况下都不会同时宕机。一个客户端需要做如下操作来获取锁:
1.获取当前时间(单位是毫秒)。
2.轮流用相同的key和随机值在N个节点上请求锁,在这一步里,客户端在每个master上请求锁时,会有一个和总的锁释放时间相比小的多的超时时间。比如如果锁自动释放时间是10秒钟,那每个节点锁请求的超时时间可能是5-50毫秒的范围,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,我们应该尽快尝试下一个master节点。
3.客户端计算第二步中获取锁所花的时间,只有当客户端在大多数master节点上成功获取了锁(在这里是3个),而且总共消耗的时间不超过锁释放时间,这个锁就认为是获取成功了。
4.如果锁获取成功了,那现在锁自动释放时间就是最初的锁释放时间减去之前获取锁所消耗的时间。
5.如果锁获取失败了,不管是因为获取成功的锁不超过一半(N/2+1)还是因为总消耗时间超过了锁释放时间,客户端都会到每个master节点上释放锁,即便是那些他认为没有获取成功的锁。
RedLock实现
redisson是redis官网推荐的java语言实现分布式锁的项目。Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。Redisson提供了分布式对象/分布式集和/分布式锁和分布式服务。
redisson支持4种链接redis的方式:
- Cluster(集群)
- Sentinel servers(哨兵)
- Master/Slave servers(主从)
- Single server(单机)
使用redisson实现分布式锁可以通过简单的配置和使用两部分完成:
1、RedissonManager类,管理redisson的初始化等操作。
public class RedissonManager {
private static final String RAtomicName = "genId_";
private static Config config = new Config();
private static Redisson redisson = null;
public static void init(){
try {
config.useClusterServers() //这是用的集群server
.setScanInterval(2000) //设置集群状态扫描时间
.setMasterConnectionPoolSize(10000) //设置连接数
.setSlaveConnectionPoolSize(10000)
.addNodeAddress("127.0.0.1:6379","127.0.0.1:6380");
redisson = Redisson.create(config);
//清空自增的ID数字
RAtomicLong atomicLong = redisson.getAtomicLong(RAtomicName);
atomicLong.set(1);
}catch (Exception e){
e.printStackTrace();
}
}
public static Redisson getRedisson(){
return redisson;
}
/** 获取redis中的原子ID */
public static Long nextID(){
RAtomicLong atomicLong = getRedisson().getAtomicLong(RAtomicName);
atomicLong.incrementAndGet();
return atomicLong.get();
}
}
2. DistributedRedisLock类,提供锁和解锁方法
public class DistributedRedisLock {
private static Redisson redisson = RedissonManager.getRedisson();
private static final String LOCK_TITLE = "redisLock_";
public static void acquire(String lockName){
String key = LOCK_TITLE + lockName;
RLock mylock = redisson.getLock(key);
mylock.lock(2, TimeUnit.MINUTES); //lock提供带timeout参数,timeout结束强制解锁,防止死锁
System.err.println("======lock======"+Thread.currentThread().getName());
}
public static void release(String lockName){
String key = LOCK_TITLE + lockName;
RLock mylock = redisson.getLock(key);
mylock.unlock();
System.err.println("======unlock======"+Thread.currentThread().getName());
}
}
3.
测试
private static void redisLock(){
RedissonManager.init(); //初始化
for (int i = 0; i < 100; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
String key = "test123";
DistributedRedisLock.acquire(key);
Thread.sleep(1000); //获得锁之后可以进行相应的处理
System.err.println("======获得锁后进行相应的操作======");
DistributedRedisLock.release(key);
System.err.println("=============================");
} catch (Exception e) {
e.printStackTrace();
}
}
});
t.start();
}
}
测试结果:
======lock======Thread-91
======获得锁后进行相应的操作======
======unlock======Thread-91
=============================
======lock======Thread-63
======获得锁后进行相应的操作======
======unlock======Thread-63
=============================
======lock======Thread-31
======获得锁后进行相应的操作======
======unlock======Thread-31
=============================
======lock======Thread-97
======获得锁后进行相应的操作======
======unlock======Thread-97
=============================
======lock======Thread-8
======获得锁后进行相应的操作======
======unlock======Thread-8
=============================