# !/usr/bin/python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :redis_connection.py.py @IDE :PyCharm @Author : @Date :2025/7/21 15:07 ''' import redis # 同步专用 # 尝试导入异步Redis模块 try: from redis import asyncio as redis_asyncio except ImportError: try: import aioredis as redis_asyncio except ImportError: raise ImportError("Neither redis.asyncio nor aioredis is available. Please install 'redis[asyncio]' or 'aioredis'") # 导入Redis异常类 from redis.exceptions import ConnectionError as redis_ConnectionError from typing import Optional, Protocol, Dict, Any, Set, Tuple from functools import wraps import asyncio from foundation.infrastructure.cache.redis_config import RedisConfig from foundation.infrastructure.cache.redis_config import load_config_from_env # 延迟导入logger以避免循环依赖 def _get_redis_logger(): try: from foundation.observability.logger.loggering import server_logger return server_logger except ImportError: import logging return logging.getLogger(__name__) from typing import Dict, Any, List, Tuple from langchain_community.storage import RedisStore def with_redis_retry(max_retries: int = 3, delay: float = 1.0): """ Redis操作重连装饰器 Args: max_retries: 最大重试次数,默认3次 delay: 重试间隔秒数,默认1秒 """ def decorator(func): @wraps(func) async def wrapper(self, *args, **kwargs): last_exception = None for attempt in range(max_retries + 1): # +1 包含第一次尝试 try: return await func(self, *args, **kwargs) except (ConnectionResetError, redis_ConnectionError) as e: last_exception = e if attempt < max_retries: _get_redis_logger().warning( f"Redis连接异常 (尝试 {attempt + 1}/{max_retries + 1}): {str(e)}" ) # 尝试重连 try: await self._reconnect() except Exception as reconnect_error: _get_redis_logger().error(f"Redis重连失败: {str(reconnect_error)}") # 如果重连失败,继续重试 await asyncio.sleep(delay * (attempt + 1)) # 指数退避 continue _get_redis_logger().info(f"Redis重连成功,重新执行操作") await asyncio.sleep(delay) # 等待连接稳定 else: _get_redis_logger().error(f"Redis操作失败,已达最大重试次数: {str(e)}") break except Exception as e: # 非连接相关的异常直接抛出 raise e # 所有重试都失败了 raise last_exception return wrapper return decorator class RedisConnection(Protocol): """ Redis 接口协议 """ async def get(self, key: str) -> Any: ... async def set(self, key: str, value: Any, ex: Optional[int] = None, nx: bool = False) -> bool: ... async def hget(self, key: str, field: str) -> Any: ... async def hset(self, key: str, field: str, value: Any) -> int: ... async def hmset(self, key: str, mapping: Dict[str, Any]) -> bool: ... async def hgetall(self, key: str) -> Dict[str, Any]: ... async def delete(self, *keys: str) -> int: ... async def exists(self, key: str) -> int: ... async def expire(self, key: str, seconds: int) -> bool: ... async def scan(self, cursor: int, match: Optional[str] = None, count: Optional[int] = None) -> tuple[ int, list[str]]: ... async def eval(self, script: str, keys: list[str], args: list[str]) -> Any: ... # 集合操作方法 async def sadd(self, key: str, *values: str) -> int: ... async def scard(self, key: str) -> int: ... async def srem(self, key: str, *values: str) -> int: ... async def smembers(self, key: str) -> Set[str]: ... async def close(self) -> None: ... class RedisAdapter(RedisConnection): """ Redis 适配器 """ def __init__(self, config: RedisConfig): self.config = config # 用于普通Redis 操作存储 self._redis = None # 用于 langchain RedisStore 存储 self._langchain_redis_client = None async def connect(self): """创建Redis连接""" # 简化的TCP Keep-Alive配置(兼容Windows系统) socket_options = { 'socket_keepalive': True, 'socket_connect_timeout': 10, # 连接超时10秒 'socket_timeout': 30, # 读写超时30秒 } # 使用新版本的redis.asyncio self._redis = redis_asyncio.from_url( self.config.url, password=self.config.password, db=self.config.db, encoding="utf-8", decode_responses=True, max_connections=self.config.max_connections, **socket_options ) # 用于 langchain RedisStore 存储 # 必须设为 False(LangChain 需要 bytes 数据) self._langchain_redis_client = redis_asyncio.from_url( self.config.url, password=self.config.password, db=self.config.db, encoding="utf-8", decode_responses=False, max_connections=self.config.max_connections, **socket_options ) # ✅ 使用同步 Redis 客户端 # self._langchain_redis_client = redis.Redis.from_url( # self.config.url, # password=self.config.password, # db=self.config.db, # decode_responses=False, # LangChain 需要 bytes # ) #错误:Expected Redis client, got Redis instead # self._langchain_redis_client = async_redis.from_url( # self.config.url, # password=self.config.password, # db=self.config.db, # decode_responses=False # ) return self @with_redis_retry() async def get(self, key: str) -> Any: """获取Redis键值""" return await self._redis.get(key) @with_redis_retry() async def set(self, key: str, value: Any, ex: Optional[int] = None, nx: bool = False) -> bool: """设置Redis键值""" return await self._redis.set(key, value, ex=ex, nx=nx) @with_redis_retry() async def setex(self, key: str, time: int, value: Any) -> bool: """设置Redis键值并指定过期时间""" return await self._redis.setex(key, time, value) @with_redis_retry() async def hget(self, key: str, field: str) -> Any: return await self._redis.hget(key, field) @with_redis_retry() async def hset(self, key: str, field: str, value: Any) -> int: return await self._redis.hset(key, field, value) @with_redis_retry() async def hmset(self, key: str, mapping: Dict[str, Any]) -> bool: return await self._redis.hmset(key, mapping) @with_redis_retry() async def hgetall(self, key: str) -> Dict[str, Any]: return await self._redis.hgetall(key) @with_redis_retry() async def delete(self, *keys: str) -> int: return await self._redis.delete(*keys) @with_redis_retry() async def exists(self, key: str) -> int: return await self._redis.exists(key) @with_redis_retry() async def expire(self, key: str, seconds: int) -> bool: return await self._redis.expire(key, seconds) @with_redis_retry() async def scan(self, cursor: int, match: Optional[str] = None, count: Optional[int] = None) -> tuple[ int, list[str]]: return await self._redis.scan(cursor, match=match, count=count) @with_redis_retry() async def eval(self, script: str, numkeys: int, *keys_and_args: str) -> Any: """执行Redis脚本""" return await self._redis.eval(script, numkeys, *keys_and_args) # 解包成独立参数 # 集合操作方法实现 @with_redis_retry() async def sadd(self, key: str, *values: str) -> int: """向集合添加成员,返回添加的成员数量""" return await self._redis.sadd(key, *values) @with_redis_retry() async def scard(self, key: str) -> int: """获取集合成员数量""" return await self._redis.scard(key) @with_redis_retry() async def srem(self, key: str, *values: str) -> int: """从集合删除成员,返回删除的成员数量""" return await self._redis.srem(key, *values) @with_redis_retry() async def smembers(self, key: str) -> Set[str]: """获取集合所有成员""" return await self._redis.smembers(key) def get_langchain_redis_client(self): return self._langchain_redis_client async def _reconnect(self) -> None: """重新连接Redis""" try: _get_redis_logger().info("正在重新连接Redis...") if self._redis: await self._redis.close() await self._redis.wait_closed() if self._langchain_redis_client: await self._langchain_redis_client.close() await self._langchain_redis_client.wait_closed() # 等待短暂时间后重连 await asyncio.sleep(1) # 重新建立连接 await self.connect() _get_redis_logger().info("Redis重连成功") except Exception as e: _get_redis_logger().error(f"Redis重连失败: {str(e)}") raise async def close(self) -> None: if self._redis: await self._redis.close() #await self._redis.wait_closed() #该方法已弃用 if self._langchain_redis_client: await self._langchain_redis_client.close() #await self._langchain_redis_client.wait_closed() class RedisConnectionFactory: """ redis 连接工厂函数 """ _connections: Dict[str, RedisConnection] = {} _stores: Dict[str, RedisStore] = {} _connection_loops: Dict[str, asyncio.AbstractEventLoop] = {} # 记录每个连接的事件循环 @classmethod async def get_connection(cls) -> RedisConnection: """获取Redis连接(单例模式,支持事件循环检测)""" # 加载配置 redis_config = load_config_from_env() #_get_redis_logger().info(f"redis_config={redis_config}") # 使用配置参数生成唯一标识 conn_id = f"{redis_config.url}-{redis_config.db}" # 获取当前事件循环 try: current_loop = asyncio.get_running_loop() except RuntimeError: # 如果没有运行的事件循环,创建一个新的 current_loop = asyncio.new_event_loop() asyncio.set_event_loop(current_loop) # 检查连接是否存在以及事件循环是否匹配 if conn_id in cls._connections: stored_loop = cls._connection_loops.get(conn_id) if stored_loop != current_loop: # 事件循环不匹配,需要重新创建连接 _get_redis_logger().warning( f"检测到事件循环变化,重新创建Redis连接: {conn_id}" ) # 关闭旧连接 try: await cls._connections[conn_id].close() except Exception as e: _get_redis_logger().debug(f"关闭旧Redis连接时出错: {e}") # 删除旧连接 del cls._connections[conn_id] del cls._connection_loops[conn_id] # 创建新连接 if conn_id not in cls._connections: adapter = RedisAdapter(redis_config) await adapter.connect() cls._connections[conn_id] = adapter cls._connection_loops[conn_id] = current_loop _get_redis_logger().info(f"创建新的Redis连接: {conn_id}") return cls._connections[conn_id] @classmethod async def get_redis_store(cls) -> RedisStore: """获取 LangChain RedisStore 实例""" # 加载配置 redis_config = load_config_from_env() conn = await cls.get_connection() # 或通过其他方式获取 client = conn.get_langchain_redis_client() return client @classmethod async def get_langchain_redis_store(cls) -> RedisStore: """获取 LangChain RedisStore 实例 目前该方法存在问题 """ # 加载配置 redis_config = load_config_from_env() # 使用配置参数生成唯一标识 store_id = f"{redis_config.url}-{redis_config.db}" if store_id not in cls._stores: conn = await cls.get_connection() # 或通过其他方式获取 client = conn.get_langchain_redis_client() store = client _get_redis_logger().info(f"client={client}") _get_redis_logger().info(f"store={dir(store)}") cls._stores[store_id] = store return cls._stores[store_id] @classmethod async def close_all(cls): """关闭所有Redis连接""" for conn in cls._connections.values(): try: await conn.close() except Exception as e: _get_redis_logger().debug(f"关闭Redis连接时出错: {e}") cls._connections = {} cls._connection_loops = {} # 同时清理事件循环记录 cls._stores = {} @classmethod def get_connection_count(cls) -> int: """获取当前连接数""" return len(cls._connections)