Redisson单进程Redis分布式乐观锁的使用与实现
本文基于Redisson 3.7.5
4. 原子锁类
Redisson中实现了两种原子锁类:RAtomicLong和RAtomicDouble,还有RLongAdder和RDoubleAdder
RAtomicDouble和RAtomicLong其实一样的,RLongAdder和RDoubleAdder其实原理也是一样的,这里我们只说RAtomicLong和RLongAdder。
4.1. RedissonAtomicLong – 基于Redis实现的原子Long类
原子类的incrementAndGet,decrementAndGet,addandGet,主要通过INCR,DECR,INCRBY,DECRBY实现,其实redis的这些操作本身就是原子性的。
@Override public RFuture<Long> getAndAddAsync(final long delta) { //getAndAdd通过INCRBY实现 return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand<Long>("INCRBY", new SingleConvertor<Long>() { @Override public Long convert(Object obj) { return ((Long) obj) - delta; } }), getName(), delta); } @Override public RFuture<Long> getAndSetAsync(long newValue) { //getAndSet通过GetSet实现 return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GETSET, getName(), newValue); } @Override public RFuture<Long> incrementAndGetAsync() { //incrementAndGet通过INCR实现 return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.INCR, getName()); } @Override public RFuture<Long> decrementAndGetAsync() { //减一通过DECR实现 return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.DECR, getName()); }
那么CAS更新呢?可以利用lua脚本的特性,也就是因为redis是单线程的,同时只能处理一个lua脚本,所以lua脚本具有原子性。
@Override public RFuture<Boolean> compareAndSetAsync(long expect, long update) { //CAS操作 //通过lua脚本的特性实现,lua脚本的原子性 //先检查值是否符合,如果符合再更新,返回true,否则返回false return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local currValue = redis.call('get', KEYS[1]); " + "if currValue == ARGV[1] " + "or (tonumber(ARGV[1]) == 0 and currValue == false) then " + "redis.call('set', KEYS[1], ARGV[2]); " + "return 1 " + "else " + "return 0 " + "end", Collections.<Object>singletonList(getName()), expect, update); }
4.2. RedissonLongAdder 基于Redis实现的LongAdder
在统计场景下(写多读少,且数值不用考虑并发安全),LongAdder表现比AtomicLong更好,那么基于redis是怎么实现呢?
Redisson的实现思路比较简单,本地留存一个longAdder,只有调用get或者sum的时候,才把本地的longAdder的数值加到redis中。
public class RedissonLongAdder extends RedissonBaseAdder<Long> implements RLongAdder { //利用RAtomicLong实现redis中保存的数值 private final RAtomicLong atomicLong; //本地longAdder private final LongAdder counter = new LongAdder(); }
统计但不get的操作都是对于本地longAdder操作:
@Override public void add(long x) { counter.add(x); } @Override public void increment() { add(1L); } @Override public void decrement() { add(-1L); }
与get还有sum相关的操作会把本地longAdder的数值加到redis中:
@Override protected RFuture<Long> addAndGetAsync() { return atomicLong.getAndAddAsync(counter.sum()); } @Override protected RFuture<Long> getAndDeleteAsync() { return atomicLong.getAndDeleteAsync(); } @Override public long sum() { return get(sumAsync()); }