retrieval_service.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. # -*- coding: utf-8 -*-
  2. """质量优先的多路向量检索服务。
  3. 四路召回架构:
  4. 1. parent_vector:父表向量检索(主体内容向量)
  5. 2. child_locator:子表向量定位 → 反查父行(精确定位片段)
  6. 3. tag_keyword:标签关键词匹配(设备型号、标准号等)
  7. 4. chapter_similarity:章节相似度检索(同类型章节参考)
  8. 合并策略:
  9. - RRF(Reciprocal Rank Fusion)融合多路排名
  10. - 按路径加权:parent_vector 1.0, child_locator 0.8, tag 1.2, chapter 0.5
  11. - 多源加分:同一条候选在多个路径中被召回时额外加分
  12. - 标签匹配加分:关键词出现在 tag_list 或文本中时额外加分
  13. - Scope 匹配加分:与当前项目/章节范围一致时额外加分
  14. 去重策略:
  15. - candidate_key 去重(基于 document_id + parent_id + chunk_id)
  16. - 内容哈希去重(同一文件同一文本内容仅保留一条)
  17. """
  18. from __future__ import annotations
  19. from typing import Any, Callable, Dict, List, Optional
  20. from core.document_chat.component.document_chat_logger import document_chat_logger as logger
  21. from core.document_chat.retrieval.candidate import (
  22. build_candidate_key,
  23. clean_candidates,
  24. merge_metadata,
  25. metadata_value,
  26. normalize_metadata,
  27. normalize_row_metadata,
  28. )
  29. from core.document_chat.retrieval.config import RetrievalConfig, default_warnings, load_retrieval_config
  30. from core.document_chat.retrieval.fusion import calc_tag_bonus, merge_recall_results
  31. from core.document_chat.retrieval.query_builder import (
  32. build_query as build_retrieval_query,
  33. build_query_keywords as build_retrieval_query_keywords,
  34. )
  35. from core.document_chat.retrieval.scope import (
  36. build_filter_expr,
  37. build_tag_expr,
  38. extract_scope,
  39. has_reliable_scope,
  40. metadata_matches_scope,
  41. select_tag_terms,
  42. )
  43. from core.document_chat.retrieval.utils import (
  44. combine_expr as _combine_expr,
  45. escape_milvus_string as _escape_milvus_string,
  46. pack_log_items as _pack_log_items,
  47. to_float as _to_float,
  48. )
  49. class DocumentChatRetrievalService:
  50. """构建检索查询,从向量库召回高质量候选。
  51. 核心流程:
  52. 1. build_query:将用户输入、章节信息、意图拼接为检索 query
  53. 2. recall:执行多路召回 → RRF 合并 → 去重
  54. """
  55. # 父表查询输出字段
  56. PARENT_OUTPUT_FIELDS = [
  57. "pk", "text", "document_id", "parent_id", "index", "tag_list",
  58. "metadata", "file_name", "chapter_title",
  59. "chapter_level_1", "chapter_level_2", "chapter_level_3",
  60. ]
  61. # 子表查询输出字段
  62. CHILD_OUTPUT_FIELDS = [
  63. "pk", "text", "document_id", "parent_id", "index", "tag_list",
  64. "metadata", "file_name", "chapter_title",
  65. "chapter_level_1", "chapter_level_2", "chapter_level_3",
  66. ]
  67. def __init__(self, config: Optional[RetrievalConfig] = None):
  68. self.config = config or load_retrieval_config()
  69. # ============================================================
  70. # Query 构建
  71. # ============================================================
  72. def build_query(self, state: Dict[str, Any]) -> str:
  73. """构建精炼检索 query,避免冗余的项目摘要。"""
  74. return build_retrieval_query(
  75. state,
  76. domain_terms=self.config.keyword_domain_terms,
  77. action_terms=self.config.keyword_action_terms,
  78. )
  79. def build_query_keywords(self, state: Dict[str, Any], query: Optional[str] = None) -> List[str]:
  80. """从多来源提取检索关键词。"""
  81. return build_retrieval_query_keywords(
  82. state,
  83. query,
  84. domain_terms=self.config.keyword_domain_terms,
  85. action_terms=self.config.keyword_action_terms,
  86. )
  87. # ============================================================
  88. # 主召回入口
  89. # ============================================================
  90. def recall(self, state: Dict[str, Any]) -> Dict[str, Any]:
  91. """执行多路向量召回,RRF 合并,去重过滤。
  92. 返回:
  93. - retrieval_candidates:去重后的候选列表
  94. - retrieval_status:recalled / no_scope / no_recall / disabled
  95. - retrieval_metrics:各路径召回统计
  96. - retrieval_steps:每步详细日志
  97. """
  98. if not self.config.enabled:
  99. return self._empty_result("disabled", [], retrieval_method="disabled")
  100. query = str(state.get("retrieval_query") or "").strip()
  101. if not query:
  102. return self._empty_result("no_recall", [self._warning("no_recall")], retrieval_method="empty_query")
  103. # 提取检索范围(项目ID、工程类型、章节分类等)
  104. scope = self._extract_scope(state)
  105. if not self._has_reliable_scope(scope) and not self.config.allow_unscoped_search:
  106. return self._empty_result(
  107. "no_scope",
  108. [self._warning("no_scope")],
  109. retrieval_method="no_scope",
  110. retrieval_scope=scope,
  111. )
  112. keywords = list(state.get("retrieval_keywords") or self.build_query_keywords(state, query))
  113. retrieval_steps: List[Dict[str, Any]] = []
  114. source_results: Dict[str, List[Dict[str, Any]]] = {}
  115. # ===== 四路召回 =====
  116. source_results["parent_vector"] = self._run_recall_path(
  117. "parent_vector",
  118. lambda: self._recall_by_parent_vector(scope, query),
  119. retrieval_steps,
  120. query=query,
  121. scope=scope,
  122. )
  123. source_results["child_locator"] = self._run_recall_path(
  124. "child_locator",
  125. lambda: self._recall_by_child_locator(scope, query),
  126. retrieval_steps,
  127. query=query,
  128. scope=scope,
  129. )
  130. if self.config.tag_recall_enabled:
  131. source_results["tag"] = self._run_recall_path(
  132. "tag",
  133. lambda: self._recall_by_tag(scope, keywords),
  134. retrieval_steps,
  135. query=" ".join(keywords[: self.config.tag_terms_limit]),
  136. scope=scope,
  137. )
  138. if scope.get("chapter_level_1") and scope.get("chapter_level_2"):
  139. source_results["chapter_similarity"] = self._run_recall_path(
  140. "chapter_similarity",
  141. lambda: self._recall_by_chapter(scope, query),
  142. retrieval_steps,
  143. query=query,
  144. scope=scope,
  145. )
  146. # ===== RRF 合并 + 去重 =====
  147. merged_candidates = self._merge_recall_results(source_results, scope, keywords)
  148. cleaned = self._clean_candidates(merged_candidates)
  149. retrieval_steps.append(
  150. {
  151. "step": "rrf_merge",
  152. "query": query,
  153. "scope": {key: value for key, value in scope.items() if value},
  154. "count": len(merged_candidates),
  155. "items": _pack_log_items(merged_candidates),
  156. }
  157. )
  158. retrieval_steps.append(
  159. {
  160. "step": "clean_candidates",
  161. "count": len(cleaned),
  162. "items": _pack_log_items(cleaned),
  163. }
  164. )
  165. if not cleaned:
  166. return self._empty_result(
  167. "no_recall",
  168. [self._warning("no_recall")],
  169. retrieval_method="multi_path_rrf",
  170. retrieval_scope=scope,
  171. retrieval_steps=retrieval_steps,
  172. )
  173. source_counts = {source: len(items or []) for source, items in source_results.items()}
  174. # 日志:区分请求的 scope、实际应用的过滤、实际召回的文件
  175. applied_expr = self._build_filter_expr(scope)
  176. actual_files = list(dict.fromkeys(
  177. str(item.get("source", ""))[:40]
  178. for item in cleaned
  179. if item.get("source")
  180. ))[:5]
  181. logger.info(
  182. f"[DocumentChat] recall completed: method=multi_path_rrf "
  183. f"requested_scope={dict((k, v) for k, v in scope.items() if v)} "
  184. f"applied_filter='{applied_expr}' "
  185. f"actual_sources={actual_files} "
  186. f"source_counts={source_counts} "
  187. f"total={len(cleaned)} max_sim={max((item.get('vector_similarity', 0.0) for item in cleaned), default=0.0):.4f}"
  188. )
  189. metrics = {
  190. "recall_count": len(cleaned),
  191. "merged_count": len(merged_candidates),
  192. "source_counts": source_counts,
  193. "max_vector_similarity": max((item.get("vector_similarity", 0.0) for item in cleaned), default=0.0),
  194. "max_fusion_score": max((item.get("fusion_score", 0.0) for item in cleaned), default=0.0),
  195. "scope": {key: value for key, value in scope.items() if value},
  196. "retrieval_method": "multi_path_rrf",
  197. }
  198. return {
  199. "retrieval_candidates": cleaned,
  200. "retrieval_steps": retrieval_steps,
  201. "retrieval_status": "recalled",
  202. "retrieval_method": "multi_path_rrf",
  203. "retrieval_metrics": metrics,
  204. "warnings": [],
  205. }
  206. def _run_recall_path(
  207. self,
  208. step: str,
  209. func: Callable[[], List[Dict[str, Any]]],
  210. retrieval_steps: List[Dict[str, Any]],
  211. query: str,
  212. scope: Dict[str, Any],
  213. ) -> List[Dict[str, Any]]:
  214. """执行单路召回,异常时不阻断其他路径。"""
  215. try:
  216. candidates = func() or []
  217. retrieval_steps.append(
  218. {
  219. "step": step,
  220. "query": query,
  221. "scope": {key: value for key, value in scope.items() if value},
  222. "count": len(candidates),
  223. "items": _pack_log_items(candidates),
  224. }
  225. )
  226. return candidates
  227. except Exception as exc:
  228. logger.warning(f"[DocumentChat] {step} recall failed: {exc}", exc_info=True)
  229. retrieval_steps.append(
  230. {
  231. "step": step,
  232. "query": query,
  233. "scope": {key: value for key, value in scope.items() if value},
  234. "count": 0,
  235. "error": str(exc),
  236. "items": [],
  237. }
  238. )
  239. return []
  240. # ============================================================
  241. # 四路召回具体实现
  242. # ============================================================
  243. def _recall_by_parent_vector(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]:
  244. """父表向量检索:Milvus 混合搜索(dense + sparse),直接返回父表文档。"""
  245. from foundation.database.base.vector.milvus_vector import MilvusVectorManager
  246. expr = self._build_filter_expr(scope)
  247. results = MilvusVectorManager().hybrid_search(
  248. param={"collection_name": self.config.parent_collection, "expr": expr},
  249. query_text=query,
  250. top_k=self.config.parent_recall_top_k,
  251. ranker_type=self.config.ranker_type,
  252. dense_weight=self.config.dense_weight,
  253. sparse_weight=self.config.sparse_weight,
  254. )
  255. return [
  256. self._candidate_from_vector_row(row, "parent_vector", scope)
  257. for row in results
  258. if str(row.get("text_content") or "").strip()
  259. ]
  260. def _recall_by_child_locator(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]:
  261. """子表向量定位 + 父表反查:先用 query 在子表中找到匹配片段,
  262. 再通过 parent_id 反查父表行,获取完整的父文档内容。
  263. 优势:子表粒度更细,能精确定位到段落级别,然后拉取对应父文档的完整内容。
  264. """
  265. from foundation.database.base.vector.milvus_vector import MilvusVectorManager
  266. expr = self._build_filter_expr(scope)
  267. child_rows = MilvusVectorManager().hybrid_search(
  268. param={"collection_name": self.config.child_collection, "expr": expr},
  269. query_text=query,
  270. top_k=self.config.child_recall_top_k,
  271. ranker_type=self.config.ranker_type,
  272. dense_weight=self.config.child_dense_weight,
  273. sparse_weight=self.config.child_sparse_weight,
  274. )
  275. # 按 parent_id 分组子表命中结果
  276. child_groups: Dict[str, List[Dict[str, Any]]] = {}
  277. for row in child_rows:
  278. metadata = self._normalize_row_metadata(row.get("metadata") or {})
  279. parent_id = str(self._metadata_value(metadata, "parent_id") or "").strip()
  280. if not parent_id:
  281. continue
  282. child_groups.setdefault(parent_id, []).append(row)
  283. # 通过 parent_id 反查父表
  284. parent_rows = self._fetch_parent_rows_by_parent_ids(list(child_groups.keys()), scope)
  285. candidates = []
  286. for parent_row in parent_rows:
  287. parent_id = str(parent_row.get("parent_id") or "").strip()
  288. matches = child_groups.get(parent_id) or []
  289. max_similarity = max((_to_float(item.get("similarity"), 0.0) for item in matches), default=0.0)
  290. candidate = self._candidate_from_parent_row(parent_row, "child_locator", scope, max_similarity)
  291. metadata = candidate.setdefault("metadata", {})
  292. metadata["child_hit_count"] = len(matches) # 子表命中次数
  293. metadata["matched_child_texts"] = [
  294. str(item.get("text_content") or "").strip()
  295. for item in matches[:5]
  296. if str(item.get("text_content") or "").strip()
  297. ]
  298. candidates.append(candidate)
  299. return candidates
  300. def _recall_by_tag(self, scope: Dict[str, Any], keywords: List[str]) -> List[Dict[str, Any]]:
  301. """标签关键词召回:从关键词中筛选标准号、设备名等专业术语,
  302. 在 tag_list 字段上做 LIKE 匹配。
  303. 注意:标签召回容易过度匹配,因此结果相似度乘以 0.7 打折。
  304. """
  305. tag_terms = self._select_tag_terms(keywords)
  306. if not tag_terms:
  307. return []
  308. tag_expr = self._build_tag_expr(tag_terms)
  309. scope_expr = self._build_filter_expr(scope)
  310. expr = _combine_expr(scope_expr, tag_expr)
  311. # 父表标签匹配
  312. parent_rows = self._condition_query(
  313. collection_name=self.config.parent_collection,
  314. filter_expr=expr,
  315. output_fields=self.PARENT_OUTPUT_FIELDS,
  316. limit=self.config.tag_recall_top_k,
  317. )
  318. candidates = [
  319. self._candidate_from_parent_row(row, "tag", scope, self.config.min_vector_similarity)
  320. for row in parent_rows
  321. ]
  322. # 子表标签匹配,再反查父行
  323. child_rows = self._condition_query(
  324. collection_name=self.config.child_collection,
  325. filter_expr=expr,
  326. output_fields=self.CHILD_OUTPUT_FIELDS,
  327. limit=self.config.tag_recall_top_k,
  328. )
  329. child_parent_ids = []
  330. child_tag_map: Dict[str, List[str]] = {}
  331. for row in child_rows:
  332. parent_id = str(row.get("parent_id") or self._metadata_value(row, "parent_id") or "").strip()
  333. if not parent_id:
  334. continue
  335. child_parent_ids.append(parent_id)
  336. text = str(row.get("text") or "").strip()
  337. if text:
  338. child_tag_map.setdefault(parent_id, []).append(text)
  339. for row in self._fetch_parent_rows_by_parent_ids(child_parent_ids, scope):
  340. parent_id = str(row.get("parent_id") or "").strip()
  341. candidate = self._candidate_from_parent_row(row, "tag", scope, self.config.min_vector_similarity)
  342. metadata = candidate.setdefault("metadata", {})
  343. metadata["matched_child_texts"] = child_tag_map.get(parent_id, [])[:5]
  344. candidates.append(candidate)
  345. # 标签结果打折,防止过度匹配
  346. for candidate in candidates:
  347. candidate["vector_similarity"] *= 0.7
  348. # 记录匹配的标签术语
  349. for candidate in candidates:
  350. metadata = candidate.setdefault("metadata", {})
  351. tag_text = " ".join(
  352. str(value or "")
  353. for value in (
  354. metadata.get("tag_list"),
  355. candidate.get("text"),
  356. " ".join(metadata.get("matched_child_texts") or []),
  357. )
  358. )
  359. metadata["tag_match_terms"] = [term for term in tag_terms if term and term in tag_text]
  360. return candidates
  361. def _recall_by_chapter(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]:
  362. """章节相似度检索:调用现有 similar_fragment_service,
  363. 按 chapter_level_1 + chapter_level_2 限定范围搜索相似片段。
  364. """
  365. from core.construction_write.component.similar_fragment_service import search_similar_fragments
  366. rows = search_similar_fragments(
  367. level1=str(scope.get("chapter_level_1") or ""),
  368. level2=str(scope.get("chapter_level_2") or ""),
  369. search_text=query,
  370. top_k=self.config.chapter_recall_top_k,
  371. )
  372. candidates = []
  373. for row in rows:
  374. metadata = {
  375. "tenant_id": scope.get("tenant_id") or "",
  376. "project_id": scope.get("project_id") or "",
  377. "knowledge_base_id": scope.get("knowledge_base_id") or "",
  378. "file_name": row.get("file_name") or "",
  379. "chapter_level_1": row.get("chapter_level_1") or scope.get("chapter_level_1") or "",
  380. "chapter_level_2": row.get("chapter_level_2") or scope.get("chapter_level_2") or "",
  381. "parent_count": row.get("parent_count", 0),
  382. "source_scope_valid": True, # 通过章节分类限定,天然 scope 匹配
  383. }
  384. text = str(row.get("text") or "").strip()
  385. candidates.append(
  386. {
  387. "candidate_key": self._build_candidate_key({**row, "metadata": metadata}, text),
  388. "text": text,
  389. "source": metadata.get("file_name") or "向量知识库",
  390. "vector_similarity": _to_float(row.get("similarity"), 0.0),
  391. "fusion_score": 0.0,
  392. "metadata": metadata,
  393. "source_hits": {},
  394. "retrieval_source": "chapter_similarity",
  395. }
  396. )
  397. return candidates
  398. # ============================================================
  399. # RRF 合并
  400. # ============================================================
  401. def _merge_recall_results(
  402. self,
  403. source_results: Dict[str, List[Dict[str, Any]]],
  404. scope: Dict[str, Any],
  405. keywords: List[str],
  406. ) -> List[Dict[str, Any]]:
  407. """多路召回结果 RRF 融合合并。"""
  408. return merge_recall_results(source_results, scope, keywords, self.config)
  409. # ============================================================
  410. # Milvus 查询辅助
  411. # ============================================================
  412. def _fetch_parent_rows_by_parent_ids(self, parent_ids: List[str], scope: Dict[str, Any]) -> List[Dict[str, Any]]:
  413. """根据 parent_id 列表反查父表行,去重后逐条查询。"""
  414. unique_ids = []
  415. seen = set()
  416. for parent_id in parent_ids:
  417. value = str(parent_id or "").strip()
  418. if value and value not in seen:
  419. seen.add(value)
  420. unique_ids.append(value)
  421. rows: List[Dict[str, Any]] = []
  422. scope_expr = self._build_filter_expr(scope)
  423. for parent_id in unique_ids[: self.config.recall_top_k]:
  424. parent_expr = f"parent_id == '{_escape_milvus_string(parent_id)}'"
  425. expr = _combine_expr(parent_expr, scope_expr)
  426. rows.extend(
  427. self._condition_query(
  428. collection_name=self.config.parent_collection,
  429. filter_expr=expr,
  430. output_fields=self.PARENT_OUTPUT_FIELDS,
  431. limit=100,
  432. )
  433. )
  434. return rows
  435. def _condition_query(
  436. self,
  437. collection_name: str,
  438. filter_expr: str,
  439. output_fields: List[str],
  440. limit: int,
  441. ) -> List[Dict[str, Any]]:
  442. """Milvus 条件查询(非向量),按 filter 表达式筛选文档。"""
  443. from core.construction_write.component.similar_fragment_service import get_milvus_manager
  444. if not filter_expr:
  445. return []
  446. return get_milvus_manager().condition_query(
  447. collection_name=collection_name,
  448. filter=filter_expr,
  449. output_fields=output_fields,
  450. limit=limit,
  451. )
  452. # ============================================================
  453. # 候选构建
  454. # ============================================================
  455. def _candidate_from_vector_row(self, row: Dict[str, Any], source: str, scope: Dict[str, Any]) -> Dict[str, Any]:
  456. """从 Milvus 混合搜索结果行构建标准候选。"""
  457. metadata = self._normalize_row_metadata(row.get("metadata") or {})
  458. text = str(row.get("text_content") or row.get("text") or "").strip()
  459. metadata["source_scope_valid"] = self._metadata_matches_scope(metadata, scope)
  460. return {
  461. "candidate_key": self._build_candidate_key(metadata, text),
  462. "text": text,
  463. "source": metadata.get("file_name") or metadata.get("title") or "向量知识库",
  464. "vector_similarity": _to_float(row.get("similarity"), 0.0),
  465. "fusion_score": 0.0,
  466. "metadata": metadata,
  467. "source_hits": {},
  468. "retrieval_source": source,
  469. }
  470. def _candidate_from_parent_row(
  471. self,
  472. row: Dict[str, Any],
  473. source: str,
  474. scope: Dict[str, Any],
  475. vector_similarity: float,
  476. ) -> Dict[str, Any]:
  477. """从父表行构建标准候选。"""
  478. metadata = self._normalize_row_metadata(row)
  479. text = str(row.get("text") or "").strip()
  480. metadata["source_scope_valid"] = self._metadata_matches_scope(metadata, scope)
  481. return {
  482. "candidate_key": self._build_candidate_key(metadata, text),
  483. "text": text,
  484. "source": metadata.get("file_name") or "向量知识库",
  485. "vector_similarity": _to_float(vector_similarity, 0.0),
  486. "fusion_score": 0.0,
  487. "metadata": metadata,
  488. "source_hits": {},
  489. "retrieval_source": source,
  490. }
  491. # ============================================================
  492. # Scope 提取与过滤
  493. # ============================================================
  494. def _extract_scope(self, state: Dict[str, Any]) -> Dict[str, Any]:
  495. """从工作流状态中提取检索范围信息。"""
  496. return extract_scope(state)
  497. @staticmethod
  498. def _has_reliable_scope(scope: Dict[str, Any]) -> bool:
  499. """判断是否有足够可靠的 scope 用于限定检索范围。"""
  500. return has_reliable_scope(scope)
  501. def _build_filter_expr(self, scope: Dict[str, Any]) -> str:
  502. """构建 Milvus 过滤表达式,按章节层级限定检索范围。"""
  503. return build_filter_expr(scope)
  504. def _build_tag_expr(self, tag_terms: List[str]) -> str:
  505. """构建标签 LIKE 查询表达式。"""
  506. return build_tag_expr(tag_terms, self.config.tag_terms_limit)
  507. def _select_tag_terms(self, keywords: List[str]) -> List[str]:
  508. """从关键词中筛选高价值标签术语。"""
  509. return select_tag_terms(
  510. keywords,
  511. self.config.tag_terms_limit,
  512. generic_terms=self.config.tag_generic_terms,
  513. priority_terms=self.config.tag_priority_terms,
  514. )
  515. @staticmethod
  516. def _metadata_matches_scope(metadata: Dict[str, Any], scope: Dict[str, Any]) -> bool:
  517. """检查候选 metadata 是否与当前检索 scope 兼容。"""
  518. return metadata_matches_scope(metadata, scope)
  519. # ============================================================
  520. # Metadata 处理
  521. # ============================================================
  522. def _normalize_row_metadata(self, row_or_metadata: Any) -> Dict[str, Any]:
  523. """规范化行数据为统一的 metadata 字典。处理嵌套 metadata 和 YAML 字符串。"""
  524. return normalize_row_metadata(row_or_metadata, self.PARENT_OUTPUT_FIELDS)
  525. @staticmethod
  526. def _normalize_metadata(metadata: Any) -> Dict[str, Any]:
  527. """将 metadata 转为字典,支持 YAML 字符串解析。"""
  528. return normalize_metadata(metadata)
  529. @staticmethod
  530. def _metadata_value(metadata: Dict[str, Any], key: str) -> Any:
  531. """安全获取 metadata 值,支持嵌套 metadata.metadata 和 YAML 字符串。"""
  532. return metadata_value(metadata, key)
  533. def _build_candidate_key(self, metadata: Dict[str, Any], text: Any = "") -> str:
  534. """构建候选唯一标识键,按优先级尝试不同字段组合。"""
  535. return build_candidate_key(metadata, text, self.PARENT_OUTPUT_FIELDS)
  536. def _merge_metadata(self, current: Dict[str, Any], incoming: Dict[str, Any]) -> None:
  537. """合并两条候选的 metadata,不覆盖已有非空值。"""
  538. merge_metadata(current, incoming)
  539. # ============================================================
  540. # 加分计算
  541. # ============================================================
  542. def _calc_tag_bonus(self, candidate: Dict[str, Any], keywords: List[str]) -> float:
  543. """计算标签匹配加分:关键词精确匹配 tag_list 加分更多,仅出现在文本中加分较少。"""
  544. return calc_tag_bonus(candidate, keywords, self.config)
  545. # ============================================================
  546. # 候选清理
  547. # ============================================================
  548. def _clean_candidates(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
  549. """清理候选:过滤过短文本、双重去重(candidate_key + 内容哈希)。
  550. 去重策略:
  551. 1. candidate_key 去重:相同 document+parent+chunk 视为同一条
  552. 2. 内容哈希去重:同一文件同一文本内容(即使路径不同)只保留一条
  553. """
  554. return clean_candidates(candidates, self.config)
  555. # ============================================================
  556. # 空结果/告警
  557. # ============================================================
  558. def _empty_result(
  559. self,
  560. status: str,
  561. warnings: List[str],
  562. retrieval_method: str = "",
  563. retrieval_scope: Optional[Dict[str, Any]] = None,
  564. retrieval_steps: Optional[List[Dict[str, Any]]] = None,
  565. ) -> Dict[str, Any]:
  566. """构建空召回结果。"""
  567. return {
  568. "retrieval_candidates": [],
  569. "retrieval_steps": retrieval_steps or [],
  570. "retrieval_status": status,
  571. "retrieval_method": retrieval_method,
  572. "retrieval_metrics": {
  573. "recall_count": 0,
  574. "retrieval_method": retrieval_method,
  575. "scope": {key: value for key, value in (retrieval_scope or {}).items() if value},
  576. },
  577. "warnings": warnings,
  578. }
  579. def _warning(self, key: str) -> str:
  580. """获取指定键的告警文案。"""
  581. warnings = self.config.warnings or default_warnings()
  582. return warnings.get(key) or ""