| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- import asyncio
- import time
- import uuid
- from typing import Optional
- from foundation.logger.loggering import server_logger
- class AsyncRedisLock:
- def __init__(self, redis_client, lock_name: str, expire_time: int = 30):
- """
- :param redis_client: 异步 Redis 客户端连接
- :param lock_name: 锁的名称
- :param expire_time: 锁的过期时间(秒)
- """
- self.redis = redis_client
- self.lock_name = lock_name
- self.expire_time = expire_time
- self.identifier = str(uuid.uuid4()) # 唯一标识,用于安全释放锁
- async def acquire(self, timeout: float = 10) -> bool:
- """
- 异步获取锁
- :param timeout: 获取锁的超时时间(秒)
- :return: 是否成功获取锁
- """
- end = time.time() + timeout
- while time.time() < end:
- #server_logger.info(f"尝试获取锁: {self.lock_name},{self.identifier},{self.expire_time}")
- # 尝试获取锁
- if await self.redis.set(
- self.lock_name,
- self.identifier,
- nx=True,
- ex=self.expire_time
- ):
- return True
- await asyncio.sleep(0.001) # 短暂等待后重试
- return False
- async def release(self) -> bool:
- """
- 异步释放锁
- :return: 是否成功释放锁
- """
- # 使用 Lua 脚本保证原子性
- unlock_script = """
- if redis.call("get", KEYS[1]) == ARGV[1] then
- return redis.call("del", KEYS[1])
- else
- return 0
- end
- """
- try:
- # 注意这里参数传递方式与同步版本不同
- result = await self.redis.eval(
- unlock_script,
- 1 ,
- self.lock_name,
- self.identifier
- )
- return bool(result)
- except Exception as e:
- print(f"Error releasing lock: {e}")
- return False
- async def __aenter__(self):
- if not await self.acquire():
- raise Exception("Could not acquire lock")
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self.release()
|