| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- """
- Milvus向量数据库连接管理
- """
- import os
- import logging
- from typing import Optional
- # 导入配置
- from app.core.config import config_handler
- from .embedding_connection import get_embedding_model
- from langchain_milvus import Milvus, BM25BuiltInFunction
- logger = logging.getLogger(__name__)
- _milvus_manager = None
- def get_milvus_vectorstore(collection_name: str, consistency_level: str = "Strong"):
- """
- 获取 Milvus Vectorstore 实例(用于混合搜索)
-
- Args:
- collection_name: 集合名称
- consistency_level: 一致性级别,默认为 "Strong"
-
- Returns:
- Milvus: LangChain 的 Milvus Vectorstore 实例
- """
- try:
- # 直接调用embedding_connection的embedding
- embedding_function = get_embedding_model()
-
- manager = get_milvus_manager()
- connection_args = {
- "uri": f"http://{manager.host}:{manager.port}",
- "user": manager.user,
- "db_name": manager.db_name
- }
-
- if manager.password:
- connection_args["password"] = manager.password
- # 动态检测向量字段名称,兼容旧集合(vector)和新集合(dense)
- vector_field_name = "dense"
- try:
- desc = manager.client.describe_collection(collection_name)
- fields = desc.get("fields", []) if isinstance(desc, dict) else []
- float_vector_fields = []
- for f in fields:
- f_name = f.get("name")
- f_type = f.get("type")
- if not f_name:
- continue
- # DataType.FLOAT_VECTOR 在 pymilvus 中通常是 101,字符串形式可能为 "FloatVector"
- if f_type == 101 or str(f_type).upper() in ("FLOAT_VECTOR", "FLOATVECTOR"):
- float_vector_fields.append(f_name)
- # 优先 dense,其次 vector,再次第一个向量字段
- if "dense" in float_vector_fields:
- vector_field_name = "dense"
- elif "vector" in float_vector_fields:
- vector_field_name = "vector"
- elif float_vector_fields:
- vector_field_name = float_vector_fields[0]
- except Exception as e:
- logger.warning(f"自动检测向量字段失败,使用默认 'dense': {e}")
-
- vectorstore = Milvus(
- embedding_function=embedding_function,
- collection_name=collection_name,
- connection_args=connection_args,
- consistency_level=consistency_level,
- builtin_function=BM25BuiltInFunction(),
- vector_field=vector_field_name
- )
- return vectorstore
- except Exception as e:
- logger.error(f"获取 Milvus Vectorstore 失败: {e}")
- raise
- class MilvusManager:
- """Milvus管理器"""
-
- def __init__(self):
- self.host: str = config_handler.get("admin_app", "MILVUS_HOST", "localhost")
- self.port: int = config_handler.get_int("admin_app", "MILVUS_PORT", 19530)
- self.db_name: str = config_handler.get("admin_app", "MILVUS_DB", "default")
- self.user: Optional[str] = config_handler.get("admin_app", "MILVUS_USER", "")
- self.password: Optional[str] = config_handler.get("admin_app", "MILVUS_PASSWORD", "")
-
- self.uri = f"http://{self.host}:{self.port}"
- logger.info(f"初始化 MilvusClient: uri={self.uri}, db={self.db_name}")
-
- # 延迟初始化 client
- self._client = None
-
- @property
- def client(self):
- """获取 Milvus 客户端(延迟初始化)"""
- if self._client is None:
- try:
- from pymilvus import MilvusClient
- self._client = MilvusClient(
- uri=self.uri,
- user=self.user or "",
- password=self.password or "",
- db_name=self.db_name,
- )
- logger.info("Milvus客户端初始化成功")
- except Exception as e:
- logger.error(f"Milvus客户端初始化失败: {e}")
- raise
- return self._client
-
- def close(self) -> None:
- """关闭 Milvus 连接"""
- if self._client:
- try:
- self._client.close()
- logger.info("Milvus连接已关闭")
- except Exception as e:
- logger.error(f"关闭Milvus连接失败: {e}")
- finally:
- self._client = None
- def get_milvus_manager() -> MilvusManager:
- """获取 Milvus 管理器单例"""
- global _milvus_manager
- if _milvus_manager is None:
- _milvus_manager = MilvusManager()
- return _milvus_manager
- def get_milvus_connection():
- """获取Milvus连接(兼容旧接口)"""
- try:
- return get_milvus_manager().client
- except Exception as e:
- logger.warning(f"Milvus连接失败: {e}")
- return None
- async def init_milvus():
- """初始化Milvus连接"""
- try:
- get_milvus_connection()
- logger.info("Milvus初始化成功")
- except Exception as e:
- logger.warning(f"Milvus初始化失败: {e}")
- async def close_milvus():
- """关闭Milvus连接"""
- global _milvus_manager
- if _milvus_manager:
- try:
- _milvus_manager.close()
- logger.info("Milvus连接已关闭")
- except Exception as e:
- logger.error(f"关闭Milvus连接失败: {e}")
- finally:
- _milvus_manager = None
|