""" Milvus向量数据库连接管理 """ import os import logging from typing import Optional # 导入配置 from app.core.config import config_handler from .embedding_connection import get_embedding_model from langchain_milvus import Milvus, BM25BuiltInFunction logger = logging.getLogger(__name__) _milvus_manager = None def get_milvus_vectorstore(collection_name: str, consistency_level: str = "Strong"): """ 获取 Milvus Vectorstore 实例(用于混合搜索) Args: collection_name: 集合名称 consistency_level: 一致性级别,默认为 "Strong" Returns: Milvus: LangChain 的 Milvus Vectorstore 实例 """ try: # 直接调用embedding_connection的embedding embedding_function = get_embedding_model() manager = get_milvus_manager() connection_args = { "uri": f"http://{manager.host}:{manager.port}", "user": manager.user, "db_name": manager.db_name } if manager.password: connection_args["password"] = manager.password # 动态检测向量字段名称,兼容旧集合(vector)和新集合(dense) vector_field_name = "dense" try: desc = manager.client.describe_collection(collection_name) fields = desc.get("fields", []) if isinstance(desc, dict) else [] float_vector_fields = [] for f in fields: f_name = f.get("name") f_type = f.get("type") if not f_name: continue # DataType.FLOAT_VECTOR 在 pymilvus 中通常是 101,字符串形式可能为 "FloatVector" if f_type == 101 or str(f_type).upper() in ("FLOAT_VECTOR", "FLOATVECTOR"): float_vector_fields.append(f_name) # 优先 dense,其次 vector,再次第一个向量字段 if "dense" in float_vector_fields: vector_field_name = "dense" elif "vector" in float_vector_fields: vector_field_name = "vector" elif float_vector_fields: vector_field_name = float_vector_fields[0] except Exception as e: logger.warning(f"自动检测向量字段失败,使用默认 'dense': {e}") vectorstore = Milvus( embedding_function=embedding_function, collection_name=collection_name, connection_args=connection_args, consistency_level=consistency_level, builtin_function=BM25BuiltInFunction(), vector_field=vector_field_name ) return vectorstore except Exception as e: logger.error(f"获取 Milvus Vectorstore 失败: {e}") raise class MilvusManager: """Milvus管理器""" def __init__(self): self.host: str = config_handler.get("admin_app", "MILVUS_HOST", "localhost") self.port: int = config_handler.get_int("admin_app", "MILVUS_PORT", 19530) self.db_name: str = config_handler.get("admin_app", "MILVUS_DB", "default") self.user: Optional[str] = config_handler.get("admin_app", "MILVUS_USER", "") self.password: Optional[str] = config_handler.get("admin_app", "MILVUS_PASSWORD", "") self.uri = f"http://{self.host}:{self.port}" logger.info(f"初始化 MilvusClient: uri={self.uri}, db={self.db_name}") # 延迟初始化 client self._client = None @property def client(self): """获取 Milvus 客户端(延迟初始化)""" if self._client is None: try: from pymilvus import MilvusClient self._client = MilvusClient( uri=self.uri, user=self.user or "", password=self.password or "", db_name=self.db_name, ) logger.info("Milvus客户端初始化成功") except Exception as e: logger.error(f"Milvus客户端初始化失败: {e}") raise return self._client def close(self) -> None: """关闭 Milvus 连接""" if self._client: try: self._client.close() logger.info("Milvus连接已关闭") except Exception as e: logger.error(f"关闭Milvus连接失败: {e}") finally: self._client = None def get_milvus_manager() -> MilvusManager: """获取 Milvus 管理器单例""" global _milvus_manager if _milvus_manager is None: _milvus_manager = MilvusManager() return _milvus_manager def get_milvus_connection(): """获取Milvus连接(兼容旧接口)""" try: return get_milvus_manager().client except Exception as e: logger.warning(f"Milvus连接失败: {e}") return None async def init_milvus(): """初始化Milvus连接""" try: get_milvus_connection() logger.info("Milvus初始化成功") except Exception as e: logger.warning(f"Milvus初始化失败: {e}") async def close_milvus(): """关闭Milvus连接""" global _milvus_manager if _milvus_manager: try: _milvus_manager.close() logger.info("Milvus连接已关闭") except Exception as e: logger.error(f"关闭Milvus连接失败: {e}") finally: _milvus_manager = None