async_redis_lock.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import asyncio
  2. import time
  3. import uuid
  4. from typing import Optional
  5. from foundation.observability.logger.loggering import server_logger
  6. class AsyncRedisLock:
  7. def __init__(self, redis_client, lock_name: str, expire_time: int = 30):
  8. """
  9. :param redis_client: 异步 Redis 客户端连接
  10. :param lock_name: 锁的名称
  11. :param expire_time: 锁的过期时间(秒)
  12. """
  13. self.redis = redis_client
  14. self.lock_name = lock_name
  15. self.expire_time = expire_time
  16. self.identifier = str(uuid.uuid4()) # 唯一标识,用于安全释放锁
  17. async def acquire(self, timeout: float = 10) -> bool:
  18. """
  19. 异步获取锁
  20. :param timeout: 获取锁的超时时间(秒)
  21. :return: 是否成功获取锁
  22. """
  23. end = time.time() + timeout
  24. while time.time() < end:
  25. #server_logger.info(f"尝试获取锁: {self.lock_name},{self.identifier},{self.expire_time}")
  26. # 尝试获取锁
  27. if await self.redis.set(
  28. self.lock_name,
  29. self.identifier,
  30. nx=True,
  31. ex=self.expire_time
  32. ):
  33. return True
  34. await asyncio.sleep(0.001) # 短暂等待后重试
  35. return False
  36. async def release(self) -> bool:
  37. """
  38. 异步释放锁
  39. :return: 是否成功释放锁
  40. """
  41. # 使用 Lua 脚本保证原子性
  42. unlock_script = """
  43. if redis.call("get", KEYS[1]) == ARGV[1] then
  44. return redis.call("del", KEYS[1])
  45. else
  46. return 0
  47. end
  48. """
  49. try:
  50. # 注意这里参数传递方式与同步版本不同
  51. result = await self.redis.eval(
  52. unlock_script,
  53. 1 ,
  54. self.lock_name,
  55. self.identifier
  56. )
  57. return bool(result)
  58. except Exception as e:
  59. print(f"Error releasing lock: {e}")
  60. return False
  61. async def __aenter__(self):
  62. if not await self.acquire():
  63. raise Exception("Could not acquire lock")
  64. return self
  65. async def __aexit__(self, exc_type, exc_val, exc_tb):
  66. await self.release()