""" Redis 连接管理模块 实现 Redis 连接池管理、健康检查和降级处理。 提供异步客户端(用于 async 路由)和同步客户端(用于同步依赖/中间件)。 需求引用: 4.1, 4.6 """ from typing import Optional from redis import asyncio as aioredis import redis as sync_redis import os import logging logger = logging.getLogger(__name__) class RedisManager: """Redis 连接管理器""" def __init__(self): self.redis: Optional[aioredis.Redis] = None # 异步客户端 self._sync_redis: Optional[sync_redis.Redis] = None # 同步客户端 self._connected = False self._sync_connected = False async def connect(self) -> bool: """建立异步 Redis 连接""" if self._connected and self.redis: return True redis_url = os.getenv("REDIS_URL", "redis://192.168.0.3:6379") redis_password = os.getenv("REDIS_PASSWORD", "") pool_size = int(os.getenv("REDIS_POOL_SIZE", "50")) try: self.redis = await aioredis.from_url( redis_url, password=redis_password if redis_password else None, encoding="utf-8", decode_responses=True, max_connections=pool_size ) await self.redis.ping() self._connected = True logger.info(f"Redis 异步连接成功: {redis_url}") # 同时建立同步连接(供同步代码使用) self._init_sync_client(redis_url, redis_password, pool_size) return True except Exception as e: logger.warning(f"Redis 连接失败,系统将降级运行: {e}") self.redis = None self._connected = False return False def _init_sync_client(self, redis_url: str, password: str, pool_size: int): """初始化同步 Redis 客户端""" try: self._sync_redis = sync_redis.from_url( redis_url, password=password if password else None, encoding="utf-8", decode_responses=True, max_connections=max(10, pool_size // 5), # 同步连接池小一些 ) self._sync_redis.ping() self._sync_connected = True logger.info("Redis 同步连接成功") except Exception as e: logger.warning(f"Redis 同步连接失败: {e}") self._sync_redis = None self._sync_connected = False async def close(self): """关闭 Redis 连接""" if self.redis: try: await self.redis.close() logger.info("Redis 异步连接已关闭") except Exception as e: logger.warning(f"关闭 Redis 异步连接时出错: {e}") finally: self.redis = None self._connected = False if self._sync_redis: try: self._sync_redis.close() except Exception: pass finally: self._sync_redis = None self._sync_connected = False async def health_check(self) -> bool: """健康检查""" if not self.redis: return False try: await self.redis.ping() return True except Exception as e: logger.warning(f"Redis 健康检查失败: {e}") return False def get_client(self) -> Optional[aioredis.Redis]: """获取异步 Redis 客户端(用于 async 路由和服务)""" if self._connected and self.redis: return self.redis return None def get_sync_client(self) -> Optional[sync_redis.Redis]: """获取同步 Redis 客户端(用于同步依赖、中间件)""" if self._sync_connected and self._sync_redis: return self._sync_redis return None @property def is_connected(self) -> bool: return self._connected # 全局单例 redis_manager = RedisManager()