from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Sequence from pymilvus import MilvusClient from langchain_core.documents import Document from foundation.infrastructure.config.config import config_handler @dataclass(frozen=True) class MilvusConfig: """ 连接配置:uri / db_name 从配置读取 """ uri: str = field( default_factory=lambda: ( f"http://{config_handler.get('milvus', 'MILVUS_HOST', 'localhost')}:" f"{int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))}" ) ) db_name:str=config_handler.get('milvus', 'MILVUS_DB', 'lq_db') class MilvusManager: """ 基于 pymilvus.MilvusClient 的管理类(不使用 langchain-milvus): - 初始化:创建 client,并 use_database(db_name) - 查询:每次传 collection_name(不固定) - 提供: 1) condition_query:纯条件查询(MilvusClient.query) """ def __init__(self, cfg: MilvusConfig): self.cfg = cfg self.client = MilvusClient(uri=self.cfg.uri) self.client.use_database(self.cfg.db_name) # 约定字段名(按你们 schema 调整) self.text_field = "text" def list_collections(self) -> List[str]: return self.client.list_collections() def condition_query( self, *, collection_name: str, filter: str, output_fields: Optional[Sequence[str]] = None, limit: Optional[int] = None, ) -> List[Dict[str, Any]]: """ filter 示例: parent_id == 'xxx' tenant == 't1' and source == 'pdf' output_fields 示例: ["text"] ["text", "parent_id", "chunk_id"] """ if not collection_name: raise ValueError("collection_name 不能为空") if output_fields is None: output_fields = [self.text_field] # 提前校验,避免直接抛 MilvusException 且不直观 if not self.client.has_collection(collection_name): existing = self.client.list_collections() raise RuntimeError( f"collection not found: {collection_name}\n" f"current db_name={self.cfg.db_name}, uri={self.cfg.uri}\n" f"collections in current db: {existing}" ) rows = self.client.query( collection_name=collection_name, filter=filter, output_fields=list(output_fields), limit=limit, ) return rows if __name__ == "__main__": mv = MilvusManager(MilvusConfig()) docs = mv.condition_query( collection_name="rag_parent_hybrid", filter="parent_id == '02267e1d-11d7-4a3d-b53f-e205edd6758f'", limit=10, ) print(docs)