java实现redis分布式锁
应用场景:多并发
特点:分布式锁、动态解决由redis宕机产生死锁的情况,基于wait()、notify()有效提高效率节省资源
Junit类,其中
testTryLock
包含多线程并发测试
package com.sirding.redis;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
public class TestRedisLock {
Logger logger = Logger.getLogger(getClass());
public static JedisPool pool;
public static Jedis jedis;
static RedisConnection connection;
public static final Object LOCK = new Object();
@BeforeClass
public static void before(){
if (pool == null) {
try {
JedisPoolConfig config = new JedisPoolConfig();
//控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(300);
config.setMaxTotal(1000);
//表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWaitMillis(1000);
//在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestOnBorrow(true);
pool = new JedisPool(config, "127.0.0.1", 2189);
jedis = pool.getResource();
JedisConnectionFactory jcf = new JedisConnectionFactory(config);
JedisShardInfo shardInfo = new JedisShardInfo("127.0.0.1", 2189);
jcf.setShardInfo(shardInfo);
jcf.setUsePool(true);
jcf.setHostName("127.0.0.1");
jcf.setPort(2189);
connection = jcf.getConnection();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void testAdd(){
Jedis jedis = pool.getResource();
long count = jedis.setnx("test", String.valueOf(System.currentTimeMillis()));
long time = jedis.ttl("test");
System.out.println("过期时间:" + time);
logger.debug(count);
jedis.close();
}
@Test
public void testTryLock() {
RedisLockUtil util = new RedisLockUtil();
for(int i = 0; i < 10; i++){
RedisLockThread redisLockThread = new RedisLockThread(util);
Thread thread = new Thread(redisLockThread, "LOCK_THREAD_" + i);
thread.start();
}
//监控redis连接池使用状态
new Thread(new Runnable() {
@Override
public void run() {
while(true){
System.out.println("当前存活的线程:" + pool.getNumActive());
System.out.println("当前空闲的线程:" + pool.getNumIdle());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
try {
Thread.sleep(1000 * 3600);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static AtomicInteger count = new AtomicInteger(0);
public static synchronized Jedis getJedis(){
System.out.println("获得jedis连接" + count.incrementAndGet());
return pool.getResource();
}
}
业务线程类,模拟实际系统中获得锁后的业务处理
package com.sirding.redis;
import redis.clients.jedis.Jedis;
public class RedisLockThread implements Runnable{
RedisLockUtil util = null;
RedisLockThread(RedisLockUtil util){
this.util = util;
}
@Override
public void run() {
final String key = "MY.LOCK";
final int expire = 100;
Jedis jedis = TestRedisLock.getJedis();
util.tryLock(jedis, key, expire);
try {
System.out.println("我已经拿到锁了:" + Thread.currentThread().getName());
//停留一秒代表处理业务处理逻辑
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
util.freeLock(jedis, key);
System.out.println(Thread.currentThread().getName() + ":执行结束");
}
}
//停留一秒代表处理业务处理逻辑
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
util.freeLock(jedis, key);
System.out.println(Thread.currentThread().getName() + ":执行结束");
}
}
获得Redis的分布式锁的工具类,通过更新Jedis获得方式,修改后可做为项目中实际获得分布式锁的工具类
package com.sirding.redis;
import java.util.concurrent.atomic.AtomicInteger;
import redis.clients.jedis.Jedis;
/**
* 获得redis分布式锁
* @author zcd
* @since 2017年4月25日
* @version 1.1
*/
public class RedisLockUtil {
/**
* 是否有正在等待竞争锁线程
*/
private volatile boolean hasWaitThread = false;
/**
* 持有锁的线程
*/
private volatile Thread holdLockThread;
/**
* 尝试获得锁的次数
*/
private AtomicInteger tryTimes = new AtomicInteger(0);
private final int MAX_TRY_TIMES = 100;
/**
* 获得redis锁
* @author zcd
* @since 2017年4月25日
* @param key 锁的主键
* @param expire 锁的过期时间
*/
public void tryLock(String key, int expire){
this.tryLock(null, key, expire);
}
/**
* 获得redis锁
* @author zcd
* @since 2017年4月25日
* @param jedis redis连接
* @param key 锁的主键
* @param expire 锁的过期时间
*/
public void tryLock(Jedis jedis, String key, int expire){
boolean locked = (jedis == null ? getLock(key, expire) : getLock(jedis, key, expire));
while(!locked){
try {
//已经有正在等待的线程&持有锁的线程不为空
if(hasWaitThread && holdLockThread != null){
synchronized (holdLockThread) {
//等待持有锁的线程执行notifyAll
holdLockThread.wait(expire);
}
}else{
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
locked = (jedis == null ? getLock(key, expire) : getLock(jedis, key, expire));
tryTimes.incrementAndGet();
}
}
/**
* 释放redis锁,通知通知其他等待线程
* @author zcd
* @since 2017年4月25日
* @param key 锁的key
* @return
*/
public boolean freeLock(String key) {
Jedis jedis = TestRedisLock.getJedis();
try {
return jedis.del(key) > 0;
} finally{
doFinally(jedis);
}
}
/**
* 释放redis锁,同时释放jedis连接,通知通知其他等待线程
* @author zcd
* @since 2017年4月25日
* @param jedis
* @param key
* @return
*/
public boolean freeLock(Jedis jedis, String key) {
try {
return jedis.del(key) > 0;
} finally{
doFinally(jedis);
}
}
/**
* 从redis获得含有有效时间的锁
* @author zcd
* @since 2017年4月25日
* @param key 锁的主键
* @param expire 过期时间
* @return
*/
private boolean getLock(String key, int expire){
Jedis jedis = TestRedisLock.getJedis();
try {
return getLock(jedis, key, expire);
} finally{
jedis.close();
}
}
/**
* 从redis获得含有有效时间的锁
* @author zcd
* @since 2017年4月25日
* @param jedis
* @param key
* @param expire
* @return
*/
private boolean getLock(Jedis jedis, String key, int expire){
//失败:0, 成功:1
long locked = jedis.setnx(key, String.valueOf(expire));
//解决死锁
solveDeadLock(jedis, key, expire);
if(locked == 1){
//设置持有redis锁的线程&设置线程等待的标识
this.holdLockThread = Thread.currentThread();
this.hasWaitThread = true;
tryTimes.set(0);
jedis.expire(key, expire);
return true;
}
return false;
}
/**
* 解决用于redis服务器宕机发生的死锁现象,关闭jedis
* @author zcd
* @since 2017年4月25日
* @param key
* @param expire
*/
private void solveDeadLock(String key, int expire) {
Jedis jedis = TestRedisLock.getJedis();
try {
this.solveDeadLock(jedis, key, expire);
} finally{
jedis.close();
}
}
/**
* 解决用于redis服务器宕机发生的死锁现象
* @author zcd
* @since 2017年4月25日
* @param jedis redis连接
* @param key 锁的主键
* @param expire 过期时间
*/
private void solveDeadLock(Jedis jedis, String key, int expire){
//判断尝试获得锁的次数
if(tryTimes.get() < MAX_TRY_TIMES){
return;
}
if(jedis == null){
solveDeadLock(key, expire);
return;
}
Long ttl = jedis.ttl(key);
//锁的过期时间大于0 并且redis中的锁过期时间是 -1 或是小于0,那么说明此锁会产生死锁,直接删除
if(ttl < 0 || expire > 0){
jedis.del(key);
tryTimes.set(0);
jedis.expire(key, expire);
return true;
}
return false;
}
/**
* 解决用于redis服务器宕机发生的死锁现象,关闭jedis
* @author zcd
* @since 2017年4月25日
* @param key
* @param expire
*/
private void solveDeadLock(String key, int expire) {
Jedis jedis = TestRedisLock.getJedis();
try {
this.solveDeadLock(jedis, key, expire);
} finally{
jedis.close();
}
}
/**
* 解决用于redis服务器宕机发生的死锁现象
* @author zcd
* @since 2017年4月25日
* @param jedis redis连接
* @param key 锁的主键
* @param expire 过期时间
*/
private void solveDeadLock(Jedis jedis, String key, int expire){
//判断尝试获得锁的次数
if(tryTimes.get() < MAX_TRY_TIMES){
return;
}
if(jedis == null){
solveDeadLock(key, expire);
return;
}
Long ttl = jedis.ttl(key);
//锁的过期时间大于0 并且redis中的锁过期时间是 -1 或是小于0,那么说明此锁会产生死锁,直接删除
if(ttl < 0 || expire > 0){
jedis.del(key);
doReset();
}
tryTimes.set(0);
}
/**
* 执行finally中逻辑操作
* @author zcd
* @since 2017年4月25日
* @param jedis
*/
private void doFinally(Jedis jedis){
if(jedis != null){
jedis.close();
}
doReset();
}
doReset();
}
tryTimes.set(0);
}
/**
* 执行finally中逻辑操作
* @author zcd
* @since 2017年4月25日
* @param jedis
*/
private void doFinally(Jedis jedis){
if(jedis != null){
jedis.close();
}
doReset();
}
private void doReset(){
private void doReset(){
hasWaitThread = false; holdLockThread = null; System.out.println(Thread.currentThread().getName() + " : 释放锁,通知等待线程"); //唤醒所有等待锁的线程 synchronized (Thread.currentThread()) { Thread.currentThread().notifyAll(); }
}}
以上三个类导入项目中即可运行Junit的测试,查看结果。
设计思路:
获得共享锁的过程这里不再赘读,这里主要阐明,如何提升性能,节省资源以及如何解决由于redis意外宕机导致的死锁问题。
性能提升:
假设生产环境有3太服务器通过nginx做负载均衡,一般获得锁的方式都是while(true) + Thread.sleep(time)的方式,循环尝试获得锁,上述RedisLockUtil的实现中也包含了此块功能的实现,此种方式,会有一个问题,就是sleep时间非常短,而并发量大、或是获得锁业务处理不及时,都会导致CPU飙升。因为while(true)一直在执行,其中一台服务器器获得了锁,其中一个线程在执行业务逻辑,而其他尝试获得锁的线程都会和执行业务逻辑的线程争抢CPU,而且多线程间轮询执行,在切换线程间的上下文对象切换上也是相当耗时、耗资源的,因此本实现类中使用了wait(),当一个线程获得了锁,那么其他竞争锁的线程调用holdLockThread.wait(),也就说其他竞争锁的线程进行等待操作,释放当前占用的资源,重新进去等待队列,等到持有锁的线程唤醒这些等待的线程,好处可想而知,在持有锁的线程执行的业务的同时,其他竞争锁的线程既不争抢CPU,同时也暂时释放了线程占用的资源,保证了CPU不会持续飙升,同时给执行业务的线程提供了更多系统资源。业务执行完成后,释放锁,通过holdLockThread.notifyAll(),唤醒等待需队列中的资源,重新进行锁的竞争。
解决死锁:
在线程获得分布式锁时要执行jedis.expire(key, expire);保证,及时没有正确释放锁,也不会导致死锁的情况,但是程序实现的过程中获得锁和设置锁的有效时间并不是一步实现的,如代码中
//失败:0, 成功:1
long locked = jedis.setnx(key, String.valueOf(expire));
//解决死锁
solveDeadLock(jedis, key, expire);
if(locked == 1){
//设置持有redis锁的线程&设置线程等待的标识
this.holdLockThread = Thread.currentThread();
this.hasWaitThread = true;
tryTimes.set(0);
jedis.expire(key, expire);
return true;
}
若果执行到“if”位置时,redis服务挂了,也是key已经存入到了redis中,而且默认的过期时间是-1,也就是无过期限制,那么待redis重新启动后,如果没有手动清楚锁,那么凡是涉及到获得此分布式锁的操作都不能执行,因此,此处加了solveDeadLock(jedis, key, expire);解决死锁的逻辑,实现很简单,就是通过jredis获得key对应的过期时间如果过期时间是-1并设置的有效锁的过期时间expire> 0 标识此锁就是个有问题的锁,将其删除,并还原相关参数。这里做了一个MAX_TRY_TIMES = 100,这个值可按需求更新,其目的是,毕竟宕机的情况是小概率事件,但是每次通过程序判断锁是否是死锁确实经常要执行操作,此处也注重考虑是否要加上处理的“死锁”的逻辑,此时可通过定义的MAX_TRY_TIME=100跳过死锁的检查、处理,即尝试获得100次进行一次死锁的判断,同时将计数器置为“0”,每次获得锁后也会将tryTimes置为0,防止频繁进行死锁的校验,以便节省资源提高效率。
PS:实现过程,一定注意wait()、notifyAll()的应用,特别是其持有的对象锁的理解,其次,一定保证RedisLockUtil是单例的。在实际的项目它可能作为一个RedisService中一个接口出现整个项目中,按项目中使用情况,动态更新即可。