| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- # !/usr/bin/python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :redis_connection.py.py
- @IDE :PyCharm
- @Author :
- @Date :2025/7/21 15:07
- '''
- import redis # 同步专用
- # 尝试导入异步Redis模块
- try:
- from redis import asyncio as redis_asyncio
- except ImportError:
- try:
- import aioredis as redis_asyncio
- except ImportError:
- raise ImportError("Neither redis.asyncio nor aioredis is available. Please install 'redis[asyncio]' or 'aioredis'")
- # 导入Redis异常类
- from redis.exceptions import ConnectionError as redis_ConnectionError
- from typing import Optional, Protocol, Dict, Any, Set, Tuple
- from functools import wraps
- import asyncio
- from foundation.base.redis_config import RedisConfig
- from foundation.base.redis_config import load_config_from_env
- from foundation.logger.loggering import server_logger
- from typing import Dict, Any, List, Tuple
- from langchain_community.storage import RedisStore
- def with_redis_retry(max_retries: int = 3, delay: float = 1.0):
- """
- Redis操作重连装饰器
- Args:
- max_retries: 最大重试次数,默认3次
- delay: 重试间隔秒数,默认1秒
- """
- def decorator(func):
- @wraps(func)
- async def wrapper(self, *args, **kwargs):
- last_exception = None
- for attempt in range(max_retries + 1): # +1 包含第一次尝试
- try:
- return await func(self, *args, **kwargs)
- except (ConnectionResetError, redis_ConnectionError) as e:
- last_exception = e
- if attempt < max_retries:
- server_logger.warning(
- f"Redis连接异常 (尝试 {attempt + 1}/{max_retries + 1}): {str(e)}"
- )
- # 尝试重连
- try:
- await self._reconnect()
- except Exception as reconnect_error:
- server_logger.error(f"Redis重连失败: {str(reconnect_error)}")
- # 如果重连失败,继续重试
- await asyncio.sleep(delay * (attempt + 1)) # 指数退避
- continue
- server_logger.info(f"Redis重连成功,重新执行操作")
- await asyncio.sleep(delay) # 等待连接稳定
- else:
- server_logger.error(f"Redis操作失败,已达最大重试次数: {str(e)}")
- break
- except Exception as e:
- # 非连接相关的异常直接抛出
- raise e
- # 所有重试都失败了
- raise last_exception
- return wrapper
- return decorator
- class RedisConnection(Protocol):
- """
- Redis 接口协议
- """
- async def get(self, key: str) -> Any: ...
- async def set(self, key: str, value: Any, ex: Optional[int] = None, nx: bool = False) -> bool: ...
- async def hget(self, key: str, field: str) -> Any: ...
- async def hset(self, key: str, field: str, value: Any) -> int: ...
- async def hmset(self, key: str, mapping: Dict[str, Any]) -> bool: ...
- async def hgetall(self, key: str) -> Dict[str, Any]: ...
- async def delete(self, *keys: str) -> int: ...
- async def exists(self, key: str) -> int: ...
- async def expire(self, key: str, seconds: int) -> bool: ...
- async def scan(self, cursor: int, match: Optional[str] = None, count: Optional[int] = None) -> tuple[
- int, list[str]]: ...
- async def eval(self, script: str, keys: list[str], args: list[str]) -> Any: ...
- # 集合操作方法
- async def sadd(self, key: str, *values: str) -> int: ...
- async def scard(self, key: str) -> int: ...
- async def srem(self, key: str, *values: str) -> int: ...
- async def smembers(self, key: str) -> Set[str]: ...
- async def close(self) -> None: ...
- class RedisAdapter(RedisConnection):
- """
- Redis 适配器
- """
- def __init__(self, config: RedisConfig):
- self.config = config
- # 用于普通Redis 操作存储
- self._redis = None
- # 用于 langchain RedisStore 存储
- self._langchain_redis_client = None
- async def connect(self):
- """创建Redis连接"""
- # 简化的TCP Keep-Alive配置(兼容Windows系统)
- socket_options = {
- 'socket_keepalive': True,
- 'socket_connect_timeout': 10, # 连接超时10秒
- 'socket_timeout': 30, # 读写超时30秒
- }
- # 使用新版本的redis.asyncio
- self._redis = redis_asyncio.from_url(
- self.config.url,
- password=self.config.password,
- db=self.config.db,
- encoding="utf-8",
- decode_responses=True,
- max_connections=self.config.max_connections,
- **socket_options
- )
- # 用于 langchain RedisStore 存储
- # 必须设为 False(LangChain 需要 bytes 数据)
- self._langchain_redis_client = redis_asyncio.from_url(
- self.config.url,
- password=self.config.password,
- db=self.config.db,
- encoding="utf-8",
- decode_responses=False,
- max_connections=self.config.max_connections,
- **socket_options
- )
-
- # ✅ 使用同步 Redis 客户端
- # self._langchain_redis_client = redis.Redis.from_url(
- # self.config.url,
- # password=self.config.password,
- # db=self.config.db,
- # decode_responses=False, # LangChain 需要 bytes
- # )
- #错误:Expected Redis client, got Redis instead
- # self._langchain_redis_client = async_redis.from_url(
- # self.config.url,
- # password=self.config.password,
- # db=self.config.db,
- # decode_responses=False
- # )
-
- return self
- @with_redis_retry()
- async def get(self, key: str) -> Any:
- """获取Redis键值"""
- return await self._redis.get(key)
- @with_redis_retry()
- async def set(self, key: str, value: Any, ex: Optional[int] = None, nx: bool = False) -> bool:
- """设置Redis键值"""
- return await self._redis.set(key, value, ex=ex, nx=nx)
- @with_redis_retry()
- async def setex(self, key: str, time: int, value: Any) -> bool:
- """设置Redis键值并指定过期时间"""
- return await self._redis.setex(key, time, value)
- @with_redis_retry()
- async def hget(self, key: str, field: str) -> Any:
- return await self._redis.hget(key, field)
- @with_redis_retry()
- async def hset(self, key: str, field: str, value: Any) -> int:
- return await self._redis.hset(key, field, value)
- @with_redis_retry()
- async def hmset(self, key: str, mapping: Dict[str, Any]) -> bool:
- return await self._redis.hmset(key, mapping)
- @with_redis_retry()
- async def hgetall(self, key: str) -> Dict[str, Any]:
- return await self._redis.hgetall(key)
- @with_redis_retry()
- async def delete(self, *keys: str) -> int:
- return await self._redis.delete(*keys)
- @with_redis_retry()
- async def exists(self, key: str) -> int:
- return await self._redis.exists(key)
- @with_redis_retry()
- async def expire(self, key: str, seconds: int) -> bool:
- return await self._redis.expire(key, seconds)
- @with_redis_retry()
- async def scan(self, cursor: int, match: Optional[str] = None, count: Optional[int] = None) -> tuple[
- int, list[str]]:
- return await self._redis.scan(cursor, match=match, count=count)
- @with_redis_retry()
- async def eval(self, script: str, numkeys: int, *keys_and_args: str) -> Any:
- """执行Redis脚本"""
- return await self._redis.eval(script, numkeys, *keys_and_args) # 解包成独立参数
- # 集合操作方法实现
- @with_redis_retry()
- async def sadd(self, key: str, *values: str) -> int:
- """向集合添加成员,返回添加的成员数量"""
- return await self._redis.sadd(key, *values)
- @with_redis_retry()
- async def scard(self, key: str) -> int:
- """获取集合成员数量"""
- return await self._redis.scard(key)
- @with_redis_retry()
- async def srem(self, key: str, *values: str) -> int:
- """从集合删除成员,返回删除的成员数量"""
- return await self._redis.srem(key, *values)
- @with_redis_retry()
- async def smembers(self, key: str) -> Set[str]:
- """获取集合所有成员"""
- return await self._redis.smembers(key)
- def get_langchain_redis_client(self):
- return self._langchain_redis_client
- async def _reconnect(self) -> None:
- """重新连接Redis"""
- try:
- server_logger.info("正在重新连接Redis...")
- if self._redis:
- await self._redis.close()
- await self._redis.wait_closed()
- if self._langchain_redis_client:
- await self._langchain_redis_client.close()
- await self._langchain_redis_client.wait_closed()
- # 等待短暂时间后重连
- await asyncio.sleep(1)
- # 重新建立连接
- await self.connect()
- server_logger.info("Redis重连成功")
- except Exception as e:
- server_logger.error(f"Redis重连失败: {str(e)}")
- raise
- async def close(self) -> None:
- if self._redis:
- await self._redis.close()
- #await self._redis.wait_closed() #该方法已弃用
- if self._langchain_redis_client:
- await self._langchain_redis_client.close()
- #await self._langchain_redis_client.wait_closed()
- class RedisConnectionFactory:
- """
- redis 连接工厂函数
- """
- _connections: Dict[str, RedisConnection] = {}
- _stores: Dict[str, RedisStore] = {}
- @classmethod
- async def get_connection(cls) -> RedisConnection:
- """获取Redis连接(单例模式)"""
- # 加载配置
- redis_config = load_config_from_env()
- #server_logger.info(f"redis_config={redis_config}")
- # 使用配置参数生成唯一标识
- conn_id = f"{redis_config.url}-{redis_config.db}"
- if conn_id not in cls._connections:
- adapter = RedisAdapter(redis_config)
- await adapter.connect()
- cls._connections[conn_id] = adapter
- return cls._connections[conn_id]
- @classmethod
- async def get_redis_store(cls) -> RedisStore:
- """获取 LangChain RedisStore 实例"""
- # 加载配置
- redis_config = load_config_from_env()
- conn = await cls.get_connection() # 或通过其他方式获取
- client = conn.get_langchain_redis_client()
- return client
- @classmethod
- async def get_langchain_redis_store(cls) -> RedisStore:
- """获取 LangChain RedisStore 实例
- 目前该方法存在问题
- """
- # 加载配置
- redis_config = load_config_from_env()
- # 使用配置参数生成唯一标识
- store_id = f"{redis_config.url}-{redis_config.db}"
- if store_id not in cls._stores:
- conn = await cls.get_connection() # 或通过其他方式获取
- client = conn.get_langchain_redis_client()
- store = client
- server_logger.info(f"client={client}")
- server_logger.info(f"store={dir(store)}")
- cls._stores[store_id] = store
- return cls._stores[store_id]
- @classmethod
- async def close_all(cls):
- """关闭所有Redis连接"""
- for conn in cls._connections.values():
- await conn.close()
- cls._connections = {}
- @classmethod
- def get_connection_count(cls) -> int:
- """获取当前连接数"""
- return len(cls._connections)
|