| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- """
- Redis 连接管理模块
- 实现 Redis 连接池管理、健康检查和降级处理。
- 提供异步客户端(用于 async 路由)和同步客户端(用于同步依赖/中间件)。
- 需求引用: 4.1, 4.6
- """
- from typing import Optional
- from redis import asyncio as aioredis
- import redis as sync_redis
- import os
- import logging
- logger = logging.getLogger(__name__)
- class RedisManager:
- """Redis 连接管理器"""
- def __init__(self):
- self.redis: Optional[aioredis.Redis] = None # 异步客户端
- self._sync_redis: Optional[sync_redis.Redis] = None # 同步客户端
- self._connected = False
- self._sync_connected = False
- async def connect(self) -> bool:
- """建立异步 Redis 连接"""
- if self._connected and self.redis:
- return True
- redis_url = os.getenv("REDIS_URL", "redis://192.168.0.3:6379")
- redis_password = os.getenv("REDIS_PASSWORD", "")
- pool_size = int(os.getenv("REDIS_POOL_SIZE", "50"))
- try:
- self.redis = await aioredis.from_url(
- redis_url,
- password=redis_password if redis_password else None,
- encoding="utf-8",
- decode_responses=True,
- max_connections=pool_size
- )
- await self.redis.ping()
- self._connected = True
- logger.info(f"Redis 异步连接成功: {redis_url}")
- # 同时建立同步连接(供同步代码使用)
- self._init_sync_client(redis_url, redis_password, pool_size)
- return True
- except Exception as e:
- logger.warning(f"Redis 连接失败,系统将降级运行: {e}")
- self.redis = None
- self._connected = False
- return False
- def _init_sync_client(self, redis_url: str, password: str, pool_size: int):
- """初始化同步 Redis 客户端"""
- try:
- self._sync_redis = sync_redis.from_url(
- redis_url,
- password=password if password else None,
- encoding="utf-8",
- decode_responses=True,
- max_connections=max(10, pool_size // 5), # 同步连接池小一些
- )
- self._sync_redis.ping()
- self._sync_connected = True
- logger.info("Redis 同步连接成功")
- except Exception as e:
- logger.warning(f"Redis 同步连接失败: {e}")
- self._sync_redis = None
- self._sync_connected = False
- async def close(self):
- """关闭 Redis 连接"""
- if self.redis:
- try:
- await self.redis.close()
- logger.info("Redis 异步连接已关闭")
- except Exception as e:
- logger.warning(f"关闭 Redis 异步连接时出错: {e}")
- finally:
- self.redis = None
- self._connected = False
- if self._sync_redis:
- try:
- self._sync_redis.close()
- except Exception:
- pass
- finally:
- self._sync_redis = None
- self._sync_connected = False
- async def health_check(self) -> bool:
- """健康检查"""
- if not self.redis:
- return False
- try:
- await self.redis.ping()
- return True
- except Exception as e:
- logger.warning(f"Redis 健康检查失败: {e}")
- return False
- def get_client(self) -> Optional[aioredis.Redis]:
- """获取异步 Redis 客户端(用于 async 路由和服务)"""
- if self._connected and self.redis:
- return self.redis
- return None
- def get_sync_client(self) -> Optional[sync_redis.Redis]:
- """获取同步 Redis 客户端(用于同步依赖、中间件)"""
- if self._sync_connected and self._sync_redis:
- return self._sync_redis
- return None
- @property
- def is_connected(self) -> bool:
- return self._connected
- # 全局单例
- redis_manager = RedisManager()
|