milvus_service.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804
  1. """
  2. Milvus Service:业务层(直接用 manager.client 调 Milvus 原生方法)
  3. """
  4. from __future__ import annotations
  5. import sys
  6. import os
  7. import logging
  8. import re
  9. import hashlib
  10. import time
  11. from typing import List, Dict, Any, Optional
  12. from datetime import datetime
  13. from app.base import get_milvus_manager, get_milvus_vectorstore, get_embedding_model
  14. from app.core.config import config_handler
  15. from langchain_core.documents import Document
  16. logger = logging.getLogger(__name__)
  17. # 默认集合名称
  18. PARENT_COLLECTION_NAME = config_handler.get("admin_app", "PARENT_COLLECTION_NAME", "test_27_parent")
  19. CHILD_COLLECTION_NAME = config_handler.get("admin_app", "CHILD_COLLECTION_NAME", "test_27_child")
  20. class MilvusService:
  21. def __init__(self):
  22. self.client = get_milvus_manager().client
  23. # 获取embedding model
  24. self.emdmodel = get_embedding_model()
  25. # 默认向量维度 (Qwen3-Embedding-8B default)
  26. self.DENSE_DIM = 4096
  27. def ensure_collections(self):
  28. """确保系统默认集合已创建"""
  29. collections = [PARENT_COLLECTION_NAME, CHILD_COLLECTION_NAME]
  30. for name in collections:
  31. self.ensure_collection_exists(name)
  32. async def insert_knowledge(self, content: str, doc_info: Dict[str, Any]):
  33. """将 Markdown 内容切分并入库 (支持父子段分表)"""
  34. try:
  35. doc_id = doc_info.get("doc_id")
  36. doc_name = doc_info.get("doc_name")
  37. doc_version = doc_info.get("doc_version", int(time.time()))
  38. tags = doc_info.get("tags", "")
  39. user_id = doc_info.get("user_id", "system")
  40. kb_method = doc_info.get("kb_method")
  41. target_collection = doc_info.get("collection_name") or PARENT_COLLECTION_NAME
  42. from langchain_text_splitters import RecursiveCharacterTextSplitter
  43. if kb_method == "parent_child":
  44. # --- 方案 A: 父子段分表入库 ---
  45. parent_col = f"{target_collection}_parent"
  46. child_col = f"{target_collection}_child"
  47. # 1. 切分父段 (较大块)
  48. parent_splitter = RecursiveCharacterTextSplitter(
  49. chunk_size=1000,
  50. chunk_overlap=100
  51. )
  52. parent_chunks = parent_splitter.split_text(content)
  53. parent_docs = []
  54. child_docs = []
  55. for p_idx, p_content in enumerate(parent_chunks):
  56. # 生成唯一的 parent_id
  57. p_id = hashlib.sha1(f"{doc_id}_p_{p_idx}".encode()).hexdigest()
  58. # 准备父段文档 (Metadata 不包含向量,仅用于检索回显)
  59. p_metadata = self._prepare_metadata(doc_info, p_id, p_idx, p_id)
  60. parent_docs.append(Document(page_content=p_content, metadata=p_metadata))
  61. # 2. 在每个父段内部切分子段 (较小块)
  62. child_splitter = RecursiveCharacterTextSplitter(
  63. chunk_size=300,
  64. chunk_overlap=30
  65. )
  66. child_chunks = child_splitter.split_text(p_content)
  67. for c_idx, c_content in enumerate(child_chunks):
  68. # 子段的 parent_id 指向父段的 p_id
  69. c_metadata = self._prepare_metadata(doc_info, p_id, c_idx, p_id)
  70. child_docs.append(Document(page_content=c_content, metadata=c_metadata))
  71. # 确保两个集合都存在
  72. self.ensure_collection_exists(parent_col)
  73. self.ensure_collection_exists(child_col)
  74. # 分别入库
  75. if parent_docs:
  76. get_milvus_vectorstore(parent_col).add_documents(parent_docs)
  77. if child_docs:
  78. get_milvus_vectorstore(child_col).add_documents(child_docs)
  79. logger.info(f"Successfully inserted parent-child chunks for {doc_name}: {len(parent_docs)} parents -> {len(child_docs)} children")
  80. else:
  81. # --- 常规单表入库逻辑 ---
  82. chunks = []
  83. if kb_method == "length":
  84. splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
  85. chunks = splitter.split_text(content)
  86. elif kb_method == "symbol":
  87. splitter = RecursiveCharacterTextSplitter(
  88. separators=["\n\n", "\n", "。", ";", "!", "?", "!", "?", ";"],
  89. chunk_size=500,
  90. chunk_overlap=0
  91. )
  92. chunks = splitter.split_text(content)
  93. else:
  94. chunks = [p.strip() for p in re.split(r"\n\s*\n+", content) if p.strip()]
  95. if not chunks:
  96. logger.warning(f"Document {doc_name} has no content chunks.")
  97. return
  98. documents = []
  99. for idx, chunk in enumerate(chunks):
  100. p_id = hashlib.sha1(f"{doc_id}_{idx}".encode()).hexdigest()
  101. metadata = self._prepare_metadata(doc_info, p_id, idx, p_id)
  102. documents.append(Document(page_content=chunk, metadata=metadata))
  103. self.ensure_collection_exists(target_collection)
  104. get_milvus_vectorstore(target_collection).add_documents(documents)
  105. logger.info(f"Successfully inserted {len(documents)} chunks for {doc_name} into {target_collection}")
  106. except Exception as e:
  107. logger.error(f"Error inserting knowledge into Milvus: {e}")
  108. raise
  109. def _prepare_metadata(self, doc_info: Dict[str, Any], p_id: str, index: int, parent_ref_id: str) -> Dict[str, Any]:
  110. """统一准备元数据"""
  111. doc_id = doc_info.get("doc_id")
  112. doc_name = doc_info.get("doc_name")
  113. doc_version = doc_info.get("doc_version", int(time.time()))
  114. tags = doc_info.get("tags", "")
  115. user_id = doc_info.get("user_id", "system")
  116. return {
  117. "document_id": doc_id,
  118. "parent_id": parent_ref_id,
  119. "index": index,
  120. "tag_list": tags,
  121. "permission": {},
  122. "is_deleted": 0,
  123. "created_by": user_id,
  124. "created_time": int(time.time() * 1000),
  125. "updated_by": user_id,
  126. "updated_time": int(time.time() * 1000),
  127. "metadata": {
  128. "doc_name": doc_name,
  129. "doc_version": doc_version,
  130. "outline_path": ""
  131. }
  132. }
  133. def ensure_collection_exists(self, name: str):
  134. """确保指定名称的集合存在,不存在则按默认 Schema 创建"""
  135. from pymilvus import DataType, Function, FunctionType
  136. # 1. 如果不存在,则创建集合
  137. if not self.client.has_collection(name):
  138. logger.info(f"Creating collection: {name}")
  139. schema = self.client.create_schema(auto_id=True, enable_dynamic_field=False)
  140. schema.add_field("pk", DataType.INT64, is_primary=True, auto_id=True)
  141. schema.add_field("text", DataType.VARCHAR, max_length=65535, enable_analyzer=True)
  142. schema.add_field("dense", DataType.FLOAT_VECTOR, dim=self.DENSE_DIM)
  143. schema.add_field("sparse", DataType.SPARSE_FLOAT_VECTOR)
  144. schema.add_field("document_id", DataType.VARCHAR, max_length=256)
  145. schema.add_field("parent_id", DataType.VARCHAR, max_length=256)
  146. schema.add_field("index", DataType.INT64)
  147. schema.add_field("tag_list", DataType.VARCHAR, max_length=2048)
  148. schema.add_field("permission", DataType.JSON, nullable=True)
  149. schema.add_field("metadata", DataType.JSON, nullable=True)
  150. schema.add_field("is_deleted", DataType.INT64, default_value=0)
  151. schema.add_field("created_by", DataType.VARCHAR, max_length=256, nullable=True)
  152. schema.add_field("created_time", DataType.INT64)
  153. schema.add_field("updated_by", DataType.VARCHAR, max_length=256, nullable=True)
  154. schema.add_field("updated_time", DataType.INT64)
  155. schema.add_function(
  156. Function(
  157. name="bm25_fn",
  158. input_field_names=["text"],
  159. output_field_names=["sparse"],
  160. function_type=FunctionType.BM25,
  161. )
  162. )
  163. self.client.create_collection(collection_name=name, schema=schema)
  164. # 2. 检查并补全索引
  165. # 获取集合的描述信息以检查字段是否存在
  166. desc = self.client.describe_collection(collection_name=name)
  167. fields_in_collection = [f.get("name") for f in desc.get("fields", [])]
  168. existing_indexes = self.client.list_indexes(collection_name=name)
  169. index_params = self.client.prepare_index_params()
  170. needs_index = False
  171. # 只有当字段存在且没有索引时才添加
  172. if "dense" in fields_in_collection and "dense_idx" not in existing_indexes:
  173. index_params.add_index(
  174. field_name="dense",
  175. index_name="dense_idx",
  176. index_type="AUTOINDEX",
  177. metric_type="COSINE",
  178. )
  179. needs_index = True
  180. if "sparse" in fields_in_collection and "bm25_idx" not in existing_indexes:
  181. index_params.add_index(
  182. field_name="sparse",
  183. index_name="bm25_idx",
  184. index_type="SPARSE_INVERTED_INDEX",
  185. metric_type="BM25",
  186. params={"inverted_index_algo": "DAAT_MAXSCORE"},
  187. )
  188. needs_index = True
  189. if "permission" in fields_in_collection and "permission" not in existing_indexes:
  190. index_params.add_index(
  191. field_name="permission",
  192. index_type="INVERTED",
  193. params={"json_cast_type": "VARCHAR"}
  194. )
  195. needs_index = True
  196. if "metadata" in fields_in_collection and "metadata" not in existing_indexes:
  197. index_params.add_index(
  198. field_name="metadata",
  199. index_type="INVERTED",
  200. params={"json_cast_type": "VARCHAR"}
  201. )
  202. needs_index = True
  203. if needs_index:
  204. logger.info(f"Creating missing indexes for collection: {name}")
  205. try:
  206. self.client.create_index(collection_name=name, index_params=index_params)
  207. except Exception as e:
  208. logger.error(f"Failed to create index for {name}: {e}")
  209. self.client.load_collection(collection_name=name)
  210. def create_collection(self, name: str, dimension: int = None, description: str = "", fields: List[Dict] = None) -> None:
  211. """
  212. 创建 Milvus 集合
  213. :param dimension: 向量维度,如果为None则使用默认值
  214. :param fields: 自定义字段列表,每个元素为 {"name": "age", "type": "INT64", ...}
  215. """
  216. # 使用默认维度
  217. if dimension is None:
  218. dimension = self.DENSE_DIM
  219. if self.client.has_collection(name):
  220. logger.info(f"Collection {name} already exists.")
  221. return
  222. # 如果有自定义字段,使用 schema 创建
  223. if fields:
  224. from pymilvus import MilvusClient, DataType, Function, FunctionType
  225. # 1. 创建 Schema
  226. schema = MilvusClient.create_schema(
  227. auto_id=True,
  228. enable_dynamic_field=True,
  229. description=description
  230. )
  231. # 检查字段中是否定义了主键
  232. has_primary = any(f.get("is_primary") for f in fields)
  233. if not has_primary:
  234. # 如果没有定义主键,添加默认主键
  235. schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
  236. # 检查是否有默认向量列,如果没有则添加 (兼容旧逻辑,但如果fields里有dense则不添加)
  237. has_vector = any(f.get("type") == "FLOAT_VECTOR" for f in fields)
  238. if not has_vector:
  239. schema.add_field(field_name="dense", datatype=DataType.FLOAT_VECTOR, dim=dimension)
  240. # 3. 添加用户自定义字段
  241. type_map = {
  242. "BOOL": DataType.BOOL,
  243. "INT8": DataType.INT8,
  244. "INT16": DataType.INT16,
  245. "INT32": DataType.INT32,
  246. "INT64": DataType.INT64,
  247. "FLOAT": DataType.FLOAT,
  248. "DOUBLE": DataType.DOUBLE,
  249. "VARCHAR": DataType.VARCHAR,
  250. "JSON": DataType.JSON,
  251. "FLOAT_VECTOR": DataType.FLOAT_VECTOR,
  252. "SPARSE_FLOAT_VECTOR": DataType.SPARSE_FLOAT_VECTOR,
  253. "BM25": DataType.SPARSE_FLOAT_VECTOR # BM25 特殊处理,映射为稀疏向量
  254. }
  255. bm25_field = None
  256. text_field_name = "text" # 默认文本字段名
  257. for f in fields:
  258. field_type_str = f.get("type", "").upper()
  259. dtype = type_map.get(field_type_str)
  260. if not dtype:
  261. continue
  262. # 记录文本字段名,供BM25使用
  263. if f.get("name") in ["text", "content", "chunk"]:
  264. text_field_name = f.get("name")
  265. kwargs = {
  266. "field_name": f.get("name"),
  267. "datatype": dtype,
  268. "description": f.get("description", "")
  269. }
  270. if f.get("is_primary"):
  271. kwargs["is_primary"] = True
  272. kwargs["auto_id"] = True # 假设主键都是自增
  273. if dtype == DataType.VARCHAR:
  274. kwargs["max_length"] = f.get("max_length", 65535)
  275. # 关键修复:如果要被 BM25 引用,必须启用 analyzer
  276. if f.get("name") in ["text", "content", "chunk"]:
  277. kwargs["enable_analyzer"] = True
  278. if dtype == DataType.FLOAT_VECTOR:
  279. kwargs["dim"] = dimension # 使用传入的 dimension
  280. schema.add_field(**kwargs)
  281. # 如果是 BM25 类型,记录下来以便后续添加 Function
  282. if field_type_str == "BM25":
  283. bm25_field = f.get("name")
  284. # 处理 BM25 Function
  285. if bm25_field:
  286. try:
  287. schema.add_function(Function(
  288. name="bm25_fn",
  289. input_field_names=[text_field_name],
  290. output_field_names=[bm25_field],
  291. function_type=FunctionType.BM25
  292. ))
  293. logger.info(f"Added BM25 function mapping {text_field_name} -> {bm25_field}")
  294. except Exception as e:
  295. logger.error(f"Failed to add BM25 function: {e}")
  296. # 4. 准备索引参数
  297. index_params = self.client.prepare_index_params()
  298. # 5. 为所有向量字段添加索引
  299. for f in fields:
  300. ftype = f.get("type", "").upper()
  301. if ftype == "FLOAT_VECTOR":
  302. index_params.add_index(
  303. field_name=f.get("name"),
  304. index_type="AUTOINDEX",
  305. metric_type="IP" # [Modified] 更改为 IP (内积),通常对规范化向量效果更好,与 COSINE 类似但更简单
  306. )
  307. elif ftype == "BM25" or ftype == "SPARSE_FLOAT_VECTOR":
  308. index_params.add_index(
  309. field_name=f.get("name"),
  310. index_type="SPARSE_INVERTED_INDEX", # 稀疏向量索引
  311. metric_type="BM25"
  312. )
  313. # 6. 为自定义标量字段添加索引
  314. for f in fields:
  315. ftype = f.get("type", "").upper()
  316. if ftype in ["VARCHAR", "INT64", "INT32", "BOOL"] and not f.get("is_primary"):
  317. # 排除主键,主键自动索引
  318. index_params.add_index(
  319. field_name=f.get("name"),
  320. index_type="INVERTED"
  321. )
  322. elif ftype == "JSON":
  323. # Milvus 2.4+ JSON 索引必须指定 json_cast_type
  324. # 这里为 JSON 字段添加默认索引,以便支持查询
  325. index_params.add_index(
  326. field_name=f.get("name"),
  327. index_type="INVERTED",
  328. params={"json_cast_type": "VARCHAR"}
  329. )
  330. # 7. 创建集合
  331. self.client.create_collection(
  332. collection_name=name,
  333. schema=schema,
  334. index_params=index_params
  335. )
  336. else:
  337. # 使用简化的 create_collection API
  338. self.client.create_collection(
  339. collection_name=name,
  340. dimension=dimension,
  341. description=description,
  342. auto_id=True, # 自动生成 ID
  343. id_type="int", # ID 类型
  344. metric_type="IP" # [Modified] 默认使用 IP
  345. )
  346. logger.info(f"Created collection {name} with dimension {dimension}")
  347. def drop_collection(self, name: str) -> None:
  348. """删除 Milvus 集合"""
  349. if self.client.has_collection(name):
  350. self.client.drop_collection(name)
  351. logger.info(f"Dropped collection {name}")
  352. def has_collection(self, name: str) -> bool:
  353. """检查集合是否存在"""
  354. return self.client.has_collection(name)
  355. def get_collection_details(self) -> List[Dict[str, Any]]:
  356. """
  357. 获取所有 Collections 详细信息
  358. """
  359. details: List[Dict[str, Any]] = []
  360. names = self.client.list_collections()
  361. for name in names:
  362. desc = self.client.describe_collection(collection_name=name)
  363. stats = self.client.get_collection_stats(collection_name=name)
  364. load_state = self.client.get_load_state(collection_name=name)
  365. # ===== 时间戳转换(按你指定写法,无封装)=====
  366. created_time = None
  367. updated_time = None
  368. if desc.get("created_timestamp") is not None:
  369. ts_int = int(desc["created_timestamp"])
  370. physical_ms = ts_int >> 18
  371. created_time = datetime.fromtimestamp(physical_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
  372. if desc.get("update_timestamp") is not None:
  373. ts_int = int(desc["update_timestamp"])
  374. physical_ms = ts_int >> 18
  375. updated_time = datetime.fromtimestamp(physical_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
  376. # ===== 数量:不保底(要求返回结构必须有 row_count)=====
  377. entity_count = stats["row_count"]
  378. # ===== 状态:不保底(要求返回结构必须有 state)=====
  379. status = load_state["state"]
  380. details.append(
  381. {
  382. "name": name,
  383. "status": status,
  384. "entity_count": entity_count,
  385. "description": desc.get("description", ""),
  386. "created_time": created_time,
  387. "updated_time": updated_time,
  388. }
  389. )
  390. logger.info(f"成功获取Collections详细信息,共{len(details)}个")
  391. return details
  392. def get_collection_state(self, name: str) -> str:
  393. """获取集合加载状态"""
  394. try:
  395. load_state = self.client.get_load_state(collection_name=name)
  396. state = load_state.get("state") if isinstance(load_state, dict) else load_state
  397. return state
  398. except Exception as e:
  399. logger.error(f"Failed to get collection state for {name}: {e}")
  400. return "Unknown"
  401. def set_collection_state(self, name: str, action: str) -> Dict[str, Any]:
  402. """
  403. 改变指定 Collection 的加载状态。
  404. 参数:
  405. - name: 集合名称
  406. - action: 操作,取值 'load' 或 'release'
  407. 返回:
  408. - 包含集合名称和当前状态的字典,例如: {"name": name, "state": "Loaded"}
  409. """
  410. action_norm = (action or "").strip().lower()
  411. if action_norm not in {"load", "release"}:
  412. raise ValueError("action 必须为 'load' 或 'release'")
  413. # 执行加载/释放
  414. if action_norm == "load":
  415. self.client.load_collection(collection_name=name)
  416. else:
  417. self.client.release_collection(collection_name=name)
  418. # 返回最新状态
  419. load_state = self.client.get_load_state(collection_name=name)
  420. state = load_state.get("state") if isinstance(load_state, dict) else load_state
  421. result = {"name": name, "state": state, "action": action_norm}
  422. logger.info(f"集合 {name} 状态更新为 {state} (action={action_norm})")
  423. return result
  424. def delete_collection_if_empty(self, name: str) -> Dict[str, Any]:
  425. """仅当集合内容为空时删除集合,否则抛出异常"""
  426. stats = self.client.get_collection_stats(collection_name=name)
  427. row_count = stats.get("row_count") if isinstance(stats, dict) else None
  428. if row_count is None:
  429. raise ValueError("无法获取集合行数,禁止删除")
  430. if int(row_count) > 0:
  431. raise ValueError("集合内容不为空,不能删除")
  432. self.client.drop_collection(collection_name=name)
  433. logger.info(f"集合 {name} 已删除")
  434. return {"name": name, "deleted": True}
  435. def get_collection_detail(self, name: str) -> Dict[str, Any]:
  436. """获取单个集合的详细信息,包含schema、索引等所有desc字段"""
  437. desc = self.client.describe_collection(collection_name=name)
  438. stats = self.client.get_collection_stats(collection_name=name)
  439. load_state = self.client.get_load_state(collection_name=name)
  440. # 时间戳转换
  441. created_time = None
  442. updated_time = None
  443. if desc.get("created_timestamp") is not None:
  444. ts_int = int(desc["created_timestamp"])
  445. physical_ms = ts_int >> 18
  446. created_time = datetime.fromtimestamp(physical_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
  447. if desc.get("update_timestamp") is not None:
  448. ts_int = int(desc["update_timestamp"])
  449. physical_ms = ts_int >> 18
  450. updated_time = datetime.fromtimestamp(physical_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
  451. entity_count = stats.get("row_count", 0)
  452. status = load_state.get("state") if isinstance(load_state, dict) else load_state
  453. # 提取字段schema
  454. fields = []
  455. if "fields" in desc:
  456. for field in desc["fields"]:
  457. field_info = {
  458. "name": field.get("name"),
  459. "type": str(field.get("type")),
  460. "description": field.get("description", ""),
  461. "is_primary": field.get("is_primary", False),
  462. "auto_id": field.get("auto_id"),
  463. }
  464. # 向量维度
  465. if "params" in field and "dim" in field["params"]:
  466. field_info["dim"] = field["params"]["dim"]
  467. # 字符串长度
  468. if "params" in field and "max_length" in field["params"]:
  469. field_info["max_length"] = field["params"]["max_length"]
  470. # 其他params
  471. if "params" in field:
  472. field_info["params"] = field["params"]
  473. fields.append(field_info)
  474. # 提取索引信息
  475. indices = []
  476. # 尝试从 describe_collection 结果中获取 (兼容旧逻辑)
  477. if "indexes" in desc:
  478. for idx in desc["indexes"]:
  479. index_info = {
  480. "field_name": idx.get("field_name"),
  481. "index_name": idx.get("index_name"),
  482. "index_type": idx.get("index_type"),
  483. "metric_type": idx.get("metric_type"),
  484. "params": idx.get("params"),
  485. }
  486. indices.append(index_info)
  487. # 如果没有获取到索引信息,尝试主动查询 list_indexes
  488. if not indices:
  489. try:
  490. # 获取索引列表 (通常返回索引名称列表)
  491. index_names = self.client.list_indexes(collection_name=name)
  492. if index_names:
  493. for idx_name in index_names:
  494. try:
  495. # 获取索引详情
  496. idx_desc = self.client.describe_index(collection_name=name, index_name=idx_name)
  497. if idx_desc:
  498. indices.append({
  499. "field_name": idx_desc.get("field_name"),
  500. "index_name": idx_desc.get("index_name"),
  501. "index_type": idx_desc.get("index_type"),
  502. "metric_type": idx_desc.get("metric_type"),
  503. "params": idx_desc.get("params"),
  504. })
  505. except Exception:
  506. continue
  507. except Exception as e:
  508. logger.warning(f"Failed to list/describe indexes for {name}: {e}")
  509. detail = {
  510. "name": name,
  511. "description": desc.get("description", ""),
  512. "status": status,
  513. "entity_count": entity_count,
  514. "created_time": created_time,
  515. "updated_time": updated_time,
  516. "fields": fields,
  517. "enable_dynamic_field": desc.get("enable_dynamic_field", False),
  518. "consistency_level": desc.get("consistency_level"),
  519. "num_shards": desc.get("num_shards"),
  520. "num_partitions": desc.get("num_partitions"),
  521. "indices": indices,
  522. "properties": desc.get("properties"),
  523. "aliases": desc.get("aliases", []),
  524. }
  525. logger.info(f"成功获取集合 {name} 的详细信息")
  526. return detail
  527. def update_collection_description(self, name: str, description: str) -> Dict[str, Any]:
  528. """使用 alter_collection_properties 更新集合描述"""
  529. description = description or ""
  530. # 1. 更新集合 description(唯一修改点)
  531. self.client.alter_collection_properties(
  532. collection_name=name,
  533. properties={"collection.description": description},
  534. )
  535. # 2. 重新获取集合信息
  536. desc = self.client.describe_collection(collection_name=name)
  537. print(desc)
  538. stats = self.client.get_collection_stats(collection_name=name)
  539. load_state = self.client.get_load_state(collection_name=name)
  540. # 3. 时间戳转换(Milvus TSO -> 物理时间)
  541. def ts_to_str(ts):
  542. if ts is None:
  543. return None
  544. ts_int = int(ts)
  545. physical_ms = ts_int >> 18
  546. return datetime.fromtimestamp(physical_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
  547. created_time = ts_to_str(desc.get("created_timestamp"))
  548. updated_time = ts_to_str(desc.get("update_timestamp"))
  549. entity_count = stats.get("row_count") if isinstance(stats, dict) else None
  550. status = load_state.get("state") if isinstance(load_state, dict) else load_state
  551. return {
  552. "name": name,
  553. "status": status,
  554. "entity_count": entity_count,
  555. "description": desc.get("description", ""),
  556. "created_time": created_time,
  557. "updated_time": updated_time,
  558. }
  559. def hybrid_search(self, collection_name: str, query_text: str,
  560. top_k: int = 3, ranker_type: str = "weighted",
  561. dense_weight: float = 0.7, sparse_weight: float = 0.3,
  562. expr: str = None):
  563. """
  564. 混合搜索(参考 test_hybrid_v2.6.py 的实现)
  565. Args:
  566. param: 包含collection_name的参数字典
  567. query_text: 查询文本
  568. top_k: 返回结果数量
  569. ranker_type: 重排序类型 "weighted" 或 "rrf"
  570. dense_weight: 密集向量权重(当ranker_type="weighted"时使用)
  571. sparse_weight: 稀疏向量权重(当ranker_type="weighted"时使用)
  572. expr: 过滤表达式 (Metadata Filtering)
  573. Returns:
  574. List[Dict]: 搜索结果列表
  575. """
  576. try:
  577. collection_name = collection_name
  578. # 确保集合已加载
  579. self.client.load_collection(collection_name)
  580. # 获取 vectorstore 实例(包含 Milvus 和 BM25BuiltInFunction)
  581. vectorstore = get_milvus_vectorstore(
  582. collection_name=collection_name,
  583. consistency_level="Strong"
  584. )
  585. # 执行混合搜索 (完全按照 test_hybrid_v2.6.py 的逻辑)
  586. # 注意:LangChain Milvus vectorstore 的 similarity_search 支持 expr 参数用于过滤
  587. if ranker_type == "weighted":
  588. results = vectorstore.similarity_search(
  589. query=query_text,
  590. k=top_k,
  591. expr=expr,
  592. ranker_type="weighted",
  593. ranker_params={"weights": [dense_weight, sparse_weight]}
  594. )
  595. else: # rrf
  596. results = vectorstore.similarity_search(
  597. query=query_text,
  598. k=top_k,
  599. expr=expr,
  600. ranker_type="rrf",
  601. ranker_params={"k": 60}
  602. )
  603. # 格式化结果,保持与其他搜索方法一致
  604. formatted_results = []
  605. for doc in results:
  606. formatted_results.append({
  607. 'id': doc.metadata.get('pk', 0),
  608. 'text_content': doc.page_content,
  609. 'metadata': doc.metadata,
  610. 'distance': 0.0,
  611. 'similarity': 1.0
  612. })
  613. logger.info(f"Hybrid search returned {len(formatted_results)} results")
  614. return formatted_results
  615. except Exception as e:
  616. logger.error(f"Error in hybrid search: {e}")
  617. # 回退到传统的向量搜索
  618. logger.info("Falling back to traditional vector search")
  619. return []
  620. # 可选:单例
  621. milvus_service = MilvusService()
  622. if __name__ == "__main__":
  623. # 推荐这样跑:
  624. # uv run python -m src.app.services.milvus_service
  625. import json
  626. service = MilvusService()
  627. # 测试混合搜索 hybrid_search
  628. print("=" * 50)
  629. print("测试混合检索 (Hybrid Search)")
  630. print("=" * 50)
  631. try:
  632. # 示例参数,需要根据实际情况修改
  633. collection_name = "first_bfp_collection_status"
  634. query_text = "《公路水运工程临时用电技术规程》(JTT1499-2024)状态为现行" # 修改为实际查询内容
  635. # 测试 weighted 模式
  636. print("\n1. 测试 Weighted 重排序模式:")
  637. print(f" 集合: {collection_name}")
  638. print(f" 查询: {query_text}")
  639. print(f" 密集权重: 0.7, 稀疏权重: 0.3")
  640. results_weighted = service.hybrid_search(
  641. collection_name=collection_name,
  642. query_text=query_text,
  643. top_k=5,
  644. ranker_type="weighted",
  645. dense_weight=0.7,
  646. sparse_weight=0.3
  647. )
  648. print(f"\n 结果数量: {len(results_weighted)}")
  649. for i, result in enumerate(results_weighted, 1):
  650. print(f" [{i}] ID: {result.get('id')}, Text: {result.get('text_content')[:50]}...")
  651. # 测试 RRF 模式
  652. print("\n2. 测试 RRF (Reciprocal Rank Fusion) 重排序模式:")
  653. print(f" 集合: {collection_name}")
  654. print(f" 查询: {query_text}")
  655. results_rrf = service.hybrid_search(
  656. collection_name=collection_name,
  657. query_text=query_text,
  658. top_k=5,
  659. ranker_type="rrf"
  660. )
  661. print(f"\n 结果数量: {len(results_rrf)}")
  662. for i, result in enumerate(results_rrf, 1):
  663. print(f" [{i}] ID: {result.get('id')}, Text: {result.get('text_content')[:50]}...")
  664. print("\n✓ 混合检索测试完成")
  665. except Exception as e:
  666. print(f"\n✗ 混合检索测试失败: {e}")
  667. import traceback
  668. traceback.print_exc()
  669. # 也可以查看集合详情
  670. print("\n" + "=" * 50)
  671. print("获取所有集合信息:")
  672. print("=" * 50)
  673. data = service.get_collection_details()
  674. for item in data:
  675. print(json.dumps(item, ensure_ascii=False, indent=2))