Redis分布式锁

一般来说,在对数据进行“加锁”时,程序首先需要通过获取(acquire)锁来得到对数据进行排他性访问的能力,然后才能对数据执行一系列操作,最后还要释放(release)给其他程序。对于能够被多个线程访问的共享内存数据结构(shared-memory data structure)来说,这种“先获取锁,然后执行操作,最后释放锁”的动作非常常见。Redis使用WATCH命令来代替对数据进行加锁,因为WATCH只会在数据被其他客户端抢先修改了的情况下通知执行了这个命令的客户端,而不会阻止其他客户端对数据的修改,所以这个命令被称为乐观锁(optimistic locking)
分布式锁也有类似的“首先获取锁,然后执行操作,最后释放锁”动作,但这种锁既不是给同一个进程中的多个线程使用,也不是给同一台机器上的多个进程使用,而是由不同机器上的不同Redis客户端进行获取和释放的。

为了防止客户端在取得锁之后崩溃,并导致锁一直处于“已被获取”的状态,最终版的锁实现将带有超时限制特性:如果获得锁的进程未能在指定的时限内完成操作,那么锁将自动释放。

导致锁出现不正确行为的原因,以及锁在不正确运行时的症状:
持有锁的进程因为操作时间过长而导致锁被自动释放,但进程本身并不知晓这一点,甚至还可能会错误地释放掉了其他进程持有的锁。
一个持有锁并打算执行长时间操作的进程已经崩溃,但其他想要获取锁的进程不知道哪个进程持有着锁,也无法检测出持有锁的进程已经崩溃,只能白白地浪费时间等待锁被释放。
在一个进程持有的锁过期之后,其他多个进程同时尝试去获取锁,并且都获得了锁。
上面第一种情况和第三种情况同时出现,导致有多个进程获得了锁,而每个进程都以为自己是唯一一个获得锁的进程。

简易锁
为了对数据进行排他性访问,程序首先要做的就是获取锁。SETNX命令天生就适合用来实现锁的获取功能,这个命令只会在键不存在的情况下为键赋值,而锁要做的就是将一个随机生成的128位UUID设置为键的值,并使用这个值来防止锁被其他进程取得。
如果程序尝试获取锁的时候失败,那么它将不断地进行重试,直到成功地取得锁或者超过给定的时限为止。

def acquire_lock(conn, lockname, acquire_timeout=10):
    identifier = str(uuid.uuid4()) //128位随机标识符
    
    end = time.time() + acquire_timeout
    while time.time() < end:
        if conn.setnx('lock:' + lockname, identifier): //尝试获取锁
            return identifier
        time.sleep(.001)
    
    return False

下面代码展示了使用锁重新实现的商品购买操作:程序首先对市场进行加锁,接着检查商品的价格,并在确保买家有足够的钱来购买商品之后,对钱和商品进行相应的转移。当操作执行完之后,程序就会释放锁。

def purchase_item_with_lock(conn, buyerid, itemid, sellerid):
    buyer = "users:%s"%buyerid
    sellerid = "users:%s"%sellerid
    item = "%s.%s"%(itemid, sellerid)
    inventory = "inventory:%s"%buyerid
    
    locked = acquire_lock(conn, market)
    if not locked:
        return False
    
    pipe = conn.pipeline(True)
    try://检查指定的商品是否仍在出售,以及买家是否有足够的钱来购买该商品
        pipe.zscore("market:", item)
        pipe.hget(buyer, 'funds')
        price, funds = pipe.execute()
        if price is None or price > funds:
            return None
        
        pipe.hincrby(seller, 'funds', int(price))
        pipe.hincrby(buyer, 'funds', int(-price))
        pipe.sadd(inventory, itemid)
        pipe.zrem("market:", item)
        pipe.execute()
        return True
    
    finally:
        release_lock(conn, market, locked) //释放锁

上面代码的锁似乎是用来加锁整个购买操作的,但实际上这把锁是用来锁住市场数据的,它之所以会包围着执行购买操作的代码,是因为程序在操作市场数据期间必须一直持有锁。

接下面的代码release_lock函数展示了锁释放操作的实现代码:函数首先使用WATCH命令监视代表锁的键,接着检查键目前的值是否和加锁时设置的值相同,并且确认值没有变化之后删除该键(这个检查还可以防止程序错误地释放同一个锁多次)。

def release_lock(conn, lockname, identifier):
    pipe = conn.pipeline(True)
    lockname = 'lock:' + lockname
    
    while True:
        try:
            pipe.watch(lockname)
            if pipe.get(lockname) == identifier:
                pipe.multi()
                pipe.delete(lockname)
                pipe.execute()
                return True
            
            pipe.unwatch()
            break
         except redis.exceptions.WatchError:
             pass
     
     return False

经过测试,与之前WATCH实现相比,锁实现的上架商品数量虽然有所减少,但是在买入商品时却不需要进行重试,并且上架商品数量和买入商品数量之间的比率,也跟卖家数量和买家数量之间的比率接近。

带有超时限制的锁
目前的锁实现在持有者崩溃的时候不会自动释放,这将导致锁一直处于已被获取的状态。为了解决这个问题,我们将为锁加上超时功能。

为了给锁加上超时的限制特性,程序将在取得锁之后,调用EXPIRE命令来为锁设置过期时间,使得Redis可以自动删除超时的锁。为了确保锁在客户端已经崩溃(客户端在执行介于SETNX和EXPIRE之间的时候崩溃是最糟糕的)的情况下仍然能够自动被释放,客户端会尝试获取锁失败之后,检查锁的超时时间,并为未设置超时时间的锁设置超时时间。因为锁总会带有超时时间,并最终因为超时而自动被释放,使得其他客户端可以继续尝试获取已被释放的锁。

需要注意的一点是,因为多个客户端在同一时间内设置的超时时间基本上都是相同的,所以即使有多个客户端同时为同一个锁设置超时时间,锁的超时时间也不会产生太大变化。

def acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10):
    identifier = str(uuid.uuid4())
    lockname = 'lock:' + lockname
    lock_timeout = int(math.ceil(lock_timeout))
    
    end = time.time() + acquire_timeout
    while time.time() < end:
        if conn.setnx(lockname, identifier):
            conn.expire(lockname, lock_timeout)
            return identifier
        elif not conn.ttl(lockname):
            conn.expire(lockname, lock_timeout)
        
        time.sleep(.001)
     
     return False
    原文作者:parvin
    原文地址: https://segmentfault.com/a/1190000013659099
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞