redis.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. """
  2. Redis 连接管理模块
  3. 实现 Redis 连接池管理、健康检查和降级处理。
  4. 提供异步客户端(用于 async 路由)和同步客户端(用于同步依赖/中间件)。
  5. 需求引用: 4.1, 4.6
  6. """
  7. from typing import Optional
  8. from redis import asyncio as aioredis
  9. import redis as sync_redis
  10. import os
  11. import logging
  12. logger = logging.getLogger(__name__)
  13. class RedisManager:
  14. """Redis 连接管理器"""
  15. def __init__(self):
  16. self.redis: Optional[aioredis.Redis] = None # 异步客户端
  17. self._sync_redis: Optional[sync_redis.Redis] = None # 同步客户端
  18. self._connected = False
  19. self._sync_connected = False
  20. async def connect(self) -> bool:
  21. """建立异步 Redis 连接"""
  22. if self._connected and self.redis:
  23. return True
  24. redis_url = os.getenv("REDIS_URL", "redis://192.168.0.3:6379")
  25. redis_password = os.getenv("REDIS_PASSWORD", "")
  26. pool_size = int(os.getenv("REDIS_POOL_SIZE", "50"))
  27. try:
  28. self.redis = await aioredis.from_url(
  29. redis_url,
  30. password=redis_password if redis_password else None,
  31. encoding="utf-8",
  32. decode_responses=True,
  33. max_connections=pool_size
  34. )
  35. await self.redis.ping()
  36. self._connected = True
  37. logger.info(f"Redis 异步连接成功: {redis_url}")
  38. # 同时建立同步连接(供同步代码使用)
  39. self._init_sync_client(redis_url, redis_password, pool_size)
  40. return True
  41. except Exception as e:
  42. logger.warning(f"Redis 连接失败,系统将降级运行: {e}")
  43. self.redis = None
  44. self._connected = False
  45. return False
  46. def _init_sync_client(self, redis_url: str, password: str, pool_size: int):
  47. """初始化同步 Redis 客户端"""
  48. try:
  49. self._sync_redis = sync_redis.from_url(
  50. redis_url,
  51. password=password if password else None,
  52. encoding="utf-8",
  53. decode_responses=True,
  54. max_connections=max(10, pool_size // 5), # 同步连接池小一些
  55. )
  56. self._sync_redis.ping()
  57. self._sync_connected = True
  58. logger.info("Redis 同步连接成功")
  59. except Exception as e:
  60. logger.warning(f"Redis 同步连接失败: {e}")
  61. self._sync_redis = None
  62. self._sync_connected = False
  63. async def close(self):
  64. """关闭 Redis 连接"""
  65. if self.redis:
  66. try:
  67. await self.redis.close()
  68. logger.info("Redis 异步连接已关闭")
  69. except Exception as e:
  70. logger.warning(f"关闭 Redis 异步连接时出错: {e}")
  71. finally:
  72. self.redis = None
  73. self._connected = False
  74. if self._sync_redis:
  75. try:
  76. self._sync_redis.close()
  77. except Exception:
  78. pass
  79. finally:
  80. self._sync_redis = None
  81. self._sync_connected = False
  82. async def health_check(self) -> bool:
  83. """健康检查"""
  84. if not self.redis:
  85. return False
  86. try:
  87. await self.redis.ping()
  88. return True
  89. except Exception as e:
  90. logger.warning(f"Redis 健康检查失败: {e}")
  91. return False
  92. def get_client(self) -> Optional[aioredis.Redis]:
  93. """获取异步 Redis 客户端(用于 async 路由和服务)"""
  94. if self._connected and self.redis:
  95. return self.redis
  96. return None
  97. def get_sync_client(self) -> Optional[sync_redis.Redis]:
  98. """获取同步 Redis 客户端(用于同步依赖、中间件)"""
  99. if self._sync_connected and self._sync_redis:
  100. return self._sync_redis
  101. return None
  102. @property
  103. def is_connected(self) -> bool:
  104. return self._connected
  105. # 全局单例
  106. redis_manager = RedisManager()