| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- from __future__ import annotations
- from dataclasses import dataclass, field
- from typing import Any, Dict, List, Optional, Sequence
- from pymilvus import MilvusClient
- from langchain_core.documents import Document
- from foundation.infrastructure.config.config import config_handler
- @dataclass(frozen=True)
- class MilvusConfig:
- """
- 连接配置:uri / db_name 从配置读取
- """
- uri: str = field(
- default_factory=lambda: (
- f"http://{config_handler.get('milvus', 'MILVUS_HOST', 'localhost')}:"
- f"{int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))}"
- )
- )
- db_name:str=config_handler.get('milvus', 'MILVUS_DB', 'lq_db')
- class MilvusManager:
- """
- 基于 pymilvus.MilvusClient 的管理类(不使用 langchain-milvus):
- - 初始化:创建 client,并 use_database(db_name)
- - 查询:每次传 collection_name(不固定)
- - 提供:
- 1) condition_query:纯条件查询(MilvusClient.query)
- """
- def __init__(self, cfg: MilvusConfig):
- self.cfg = cfg
- self.client = MilvusClient(uri=self.cfg.uri)
- self.client.use_database(self.cfg.db_name)
- # 约定字段名(按你们 schema 调整)
- self.text_field = "text"
- def list_collections(self) -> List[str]:
- return self.client.list_collections()
- def condition_query(
- self,
- *,
- collection_name: str,
- filter: str,
- output_fields: Optional[Sequence[str]] = None,
- limit: Optional[int] = None,
- ) -> List[Dict[str, Any]]:
- """
- filter 示例:
- parent_id == 'xxx'
- tenant == 't1' and source == 'pdf'
- output_fields 示例:
- ["text"]
- ["text", "parent_id", "chunk_id"]
- """
- if not collection_name:
- raise ValueError("collection_name 不能为空")
- if output_fields is None:
- output_fields = [self.text_field]
- # 提前校验,避免直接抛 MilvusException 且不直观
- if not self.client.has_collection(collection_name):
- existing = self.client.list_collections()
- raise RuntimeError(
- f"collection not found: {collection_name}\n"
- f"current db_name={self.cfg.db_name}, uri={self.cfg.uri}\n"
- f"collections in current db: {existing}"
- )
- rows = self.client.query(
- collection_name=collection_name,
- filter=filter,
- output_fields=list(output_fields),
- limit=limit,
- )
- return rows
- if __name__ == "__main__":
- mv = MilvusManager(MilvusConfig())
- docs = mv.condition_query(
- collection_name="rag_parent_hybrid",
- filter="parent_id == '02267e1d-11d7-4a3d-b53f-e205edd6758f'",
- limit=10,
- )
- print(docs)
|