import asyncio import time import uuid from typing import Optional from foundation.observability.logger.loggering import server_logger class AsyncRedisLock: def __init__(self, redis_client, lock_name: str, expire_time: int = 30): """ :param redis_client: 异步 Redis 客户端连接 :param lock_name: 锁的名称 :param expire_time: 锁的过期时间(秒) """ self.redis = redis_client self.lock_name = lock_name self.expire_time = expire_time self.identifier = str(uuid.uuid4()) # 唯一标识,用于安全释放锁 async def acquire(self, timeout: float = 10) -> bool: """ 异步获取锁 :param timeout: 获取锁的超时时间(秒) :return: 是否成功获取锁 """ end = time.time() + timeout while time.time() < end: #server_logger.info(f"尝试获取锁: {self.lock_name},{self.identifier},{self.expire_time}") # 尝试获取锁 if await self.redis.set( self.lock_name, self.identifier, nx=True, ex=self.expire_time ): return True await asyncio.sleep(0.001) # 短暂等待后重试 return False async def release(self) -> bool: """ 异步释放锁 :return: 是否成功释放锁 """ # 使用 Lua 脚本保证原子性 unlock_script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ try: # 注意这里参数传递方式与同步版本不同 result = await self.redis.eval( unlock_script, 1 , self.lock_name, self.identifier ) return bool(result) except Exception as e: print(f"Error releasing lock: {e}") return False async def __aenter__(self): if not await self.acquire(): raise Exception("Could not acquire lock") return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.release()