定义代码如下
import redis
import contextlib
import pickle
import os, socket, threading
class RedisLock:
def __init__(self, lock_name, host='', port=6379, db=0):
self.lock_name = lock_name
self.redis = redis.Redis(connection_pool=redis.ConnectionPool(host=host, port=port, db=db))
def acquire_lock(self, lock_id, expire=None):
lock_id = lock_id if lock_id else self.get_lock_id()
return True if self.redis.set(self.lock_name, pickle.dumps(lock_id), nx=True, ex=expire) else False
# Above 1 line code can replace with follow codes to debug
# if self.redis.set(self.lock_name, pickle.dumps(lock_id), nx=True, ex=expire):
# print('Lock Succeed')
# return True
# else:
# print('Lock Failed')
# return False
def release_lock(self, lock_id=None):
lock_id = lock_id if lock_id else self.get_lock_id()
if lock_id == pickle.loads(self.redis.get(self.lock_name)):
self.redis.delete(self.lock_name)
# print('Unlock Succeed')
return True
else:
# print('Unlock Failed')
return False
@contextlib.contextmanager
def lock(self, lock_id=None, expire=None):
if not self.acquire_lock(lock_id, expire):
exit(0)
yield self
self.release_lock(lock_id)
def get_lock_id(self):
""" hostname+processID+threadName"""
return f'{socket.gethostname()}{os.getpid()}{threading.current_thread().name}'
调用代码如下
redis_lock = RedisLock('lockname', host='Your IP') # 第一个匿名参数必传,作为 redis的key
with redis_lock.lock() as lock:
print('You Can Do Something Here')
注意说明
1. 注释部分是我写的时候,调试用的代码,最后写完的时候都替换为简洁的语法.
2. 因为锁具有互斥特性, 所以选择 set() 的 nx 参数来实现,
nx参数:我个人一直这样记(读作 not exist) ====> 不存在
理解:
不存在则添加,存在就不添加了。
举一反三,没有锁就加个锁。有锁就不加锁了。
3. set方法 的 ex 参数, 可代替expire方法的来设置过期时间
4. redis 有 很多指令变形。 比如 set(nx=, ex=) 可拆分为 setnx setex
但 "set() 这种指令更优", 能用就尽量用, 理由如下:
"set指令 好处是 set具有原子性", 避免了解决资源竞争的同时引发自身可能出现的资源竞争
5. 我使用装饰器版 的 上下文管理器,对代码做了封装, 所以调用时,用"with语句"即可
6. with redis_lock.lock() as lock ,"lock() 这里可以自己指定2个参数":
lock_id=None # 这是区分不同线程的唯一标识符,默认为(主机名+进程ID+线程名),可自传
expire=None # 过期时间,秒为单位
7. 需要注意一个点,与redis通信是以二进制形式。 所以我在代码内部对 lock_id 做了"pickle序列化"
当然如果是字符串用 encode() 与 decode() 来实现也是可以的。