retrieval_service.py 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134
  1. # -*- coding: utf-8 -*-
  2. """质量优先的多路向量检索服务。
  3. 四路召回架构:
  4. 1. parent_vector:父表向量检索(主体内容向量)
  5. 2. child_locator:子表向量定位 → 反查父行(精确定位片段)
  6. 3. tag_keyword:标签关键词匹配(设备型号、标准号等)
  7. 4. chapter_similarity:章节相似度检索(同类型章节参考)
  8. 合并策略:
  9. - RRF(Reciprocal Rank Fusion)融合多路排名
  10. - 按路径加权:parent_vector 1.0, child_locator 0.8, tag 1.2, chapter 0.5
  11. - 多源加分:同一条候选在多个路径中被召回时额外加分
  12. - 标签匹配加分:关键词出现在 tag_list 或文本中时额外加分
  13. - Scope 匹配加分:与当前项目/章节范围一致时额外加分
  14. 去重策略:
  15. - candidate_key 去重(基于 document_id + parent_id + chunk_id)
  16. - 内容哈希去重(同一文件同一文本内容仅保留一条)
  17. """
  18. from __future__ import annotations
  19. from dataclasses import dataclass
  20. from hashlib import md5
  21. from pathlib import Path
  22. import re
  23. from typing import Any, Callable, Dict, List, Optional
  24. import yaml
  25. from core.document_chat.component.document_chat_logger import document_chat_logger as logger
  26. PROJECT_ROOT = Path(__file__).resolve().parents[3]
  27. RETRIEVAL_CONFIG_PATH = PROJECT_ROOT / "config" / "document_chat_retrieval.yaml"
  28. @dataclass(frozen=True)
  29. class RetrievalConfig:
  30. """检索配置(不可变)。各参数含义见下方字段注释。"""
  31. enabled: bool = True
  32. parent_collection: str = "t_kngs_construction_plan_parent"
  33. child_collection: str = "t_kngs_construction_plan_child"
  34. # 各路径召回上限
  35. parent_recall_top_k: int = 30
  36. child_recall_top_k: int = 40
  37. tag_recall_top_k: int = 30
  38. chapter_recall_top_k: int = 15
  39. recall_top_k: int = 30
  40. rerank_top_k: int = 8
  41. submit_top_k: int = 3 # 最终送入 LLM prompt 的参考条数上限
  42. # 质量阈值
  43. min_vector_similarity: float = 0.45
  44. min_rerank_score: float = 0.65 # 重排质量门,低于此值被过滤
  45. min_qualified_count: int = 1
  46. # 参考内容长度限制
  47. max_reference_chars: int = 4000 # 所有参考总字符上限
  48. max_single_reference_chars: int = 1500 # 单条参考字符上限
  49. # 降级策略
  50. allow_vector_fallback: bool = False
  51. allow_unscoped_search: bool = False
  52. # 混合搜索权重(dense=sparse 向量融合)
  53. dense_weight: float = 0.7
  54. sparse_weight: float = 0.3
  55. child_dense_weight: float = 0.6
  56. child_sparse_weight: float = 0.4
  57. ranker_type: str = "weighted"
  58. # 标签召回
  59. tag_recall_enabled: bool = True
  60. tag_terms_limit: int = 8
  61. # RRF 参数
  62. rrf_k: int = 60
  63. # 路径权重
  64. parent_vector_weight: float = 1.0
  65. child_locator_weight: float = 0.8
  66. tag_weight: float = 1.2
  67. chapter_similarity_weight: float = 0.5
  68. # 加分项
  69. tag_exact_bonus: float = 0.08
  70. tag_partial_bonus: float = 0.03
  71. multi_source_bonus: float = 0.02
  72. scope_bonus: float = 0.03
  73. warnings: Optional[Dict[str, str]] = None
  74. def load_retrieval_config() -> RetrievalConfig:
  75. """从 YAML 配置文件加载检索参数,文件不存在时使用默认值。"""
  76. if not RETRIEVAL_CONFIG_PATH.exists():
  77. return RetrievalConfig(warnings=_default_warnings())
  78. with open(RETRIEVAL_CONFIG_PATH, "r", encoding="utf-8") as handle:
  79. raw = yaml.safe_load(handle) or {}
  80. retrieval = raw.get("retrieval") or {}
  81. warnings = raw.get("warnings") or _default_warnings()
  82. return RetrievalConfig(
  83. enabled=bool(retrieval.get("enabled", True)),
  84. parent_collection=str(retrieval.get("parent_collection") or "t_kngs_construction_plan_parent"),
  85. child_collection=str(retrieval.get("child_collection") or "t_kngs_construction_plan_child"),
  86. parent_recall_top_k=_to_int(retrieval.get("parent_recall_top_k"), 30),
  87. child_recall_top_k=_to_int(retrieval.get("child_recall_top_k"), 40),
  88. tag_recall_top_k=_to_int(retrieval.get("tag_recall_top_k"), 30),
  89. chapter_recall_top_k=_to_int(retrieval.get("chapter_recall_top_k"), 15),
  90. recall_top_k=_to_int(retrieval.get("recall_top_k"), 30),
  91. rerank_top_k=_to_int(retrieval.get("rerank_top_k"), 8),
  92. submit_top_k=_to_int(retrieval.get("submit_top_k"), 3),
  93. min_vector_similarity=_to_float(retrieval.get("min_vector_similarity"), 0.45),
  94. min_rerank_score=_to_float(retrieval.get("min_rerank_score"), 0.65),
  95. min_qualified_count=_to_int(retrieval.get("min_qualified_count"), 1),
  96. max_reference_chars=_to_int(retrieval.get("max_reference_chars"), 4000),
  97. max_single_reference_chars=_to_int(retrieval.get("max_single_reference_chars"), 1500),
  98. allow_vector_fallback=bool(retrieval.get("allow_vector_fallback", False)),
  99. allow_unscoped_search=bool(retrieval.get("allow_unscoped_search", False)),
  100. dense_weight=_to_float(retrieval.get("dense_weight"), 0.7),
  101. sparse_weight=_to_float(retrieval.get("sparse_weight"), 0.3),
  102. child_dense_weight=_to_float(retrieval.get("child_dense_weight"), 0.6),
  103. child_sparse_weight=_to_float(retrieval.get("child_sparse_weight"), 0.4),
  104. ranker_type=str(retrieval.get("ranker_type") or "weighted"),
  105. tag_recall_enabled=bool(retrieval.get("tag_recall_enabled", True)),
  106. tag_terms_limit=_to_int(retrieval.get("tag_terms_limit"), 8),
  107. rrf_k=_to_int(retrieval.get("rrf_k"), 60),
  108. parent_vector_weight=_to_float(retrieval.get("parent_vector_weight"), 1.0),
  109. child_locator_weight=_to_float(retrieval.get("child_locator_weight"), 0.8),
  110. tag_weight=_to_float(retrieval.get("tag_weight"), 1.2),
  111. chapter_similarity_weight=_to_float(retrieval.get("chapter_similarity_weight"), 0.5),
  112. tag_exact_bonus=_to_float(retrieval.get("tag_exact_bonus"), 0.08),
  113. tag_partial_bonus=_to_float(retrieval.get("tag_partial_bonus"), 0.03),
  114. multi_source_bonus=_to_float(retrieval.get("multi_source_bonus"), 0.02),
  115. scope_bonus=_to_float(retrieval.get("scope_bonus"), 0.03),
  116. warnings=warnings,
  117. )
  118. class DocumentChatRetrievalService:
  119. """构建检索查询,从向量库召回高质量候选。
  120. 核心流程:
  121. 1. build_query:将用户输入、章节信息、意图拼接为检索 query
  122. 2. recall:执行多路召回 → RRF 合并 → 去重
  123. """
  124. # 父表查询输出字段
  125. PARENT_OUTPUT_FIELDS = [
  126. "pk", "text", "document_id", "parent_id", "index", "tag_list",
  127. "metadata", "file_name", "chapter_title",
  128. "chapter_level_1", "chapter_level_2", "chapter_level_3",
  129. ]
  130. # 子表查询输出字段
  131. CHILD_OUTPUT_FIELDS = [
  132. "pk", "text", "document_id", "parent_id", "index", "tag_list",
  133. "metadata", "file_name", "chapter_title",
  134. "chapter_level_1", "chapter_level_2", "chapter_level_3",
  135. ]
  136. def __init__(self, config: Optional[RetrievalConfig] = None):
  137. self.config = config or load_retrieval_config()
  138. # ============================================================
  139. # Query 构建
  140. # ============================================================
  141. def build_query(self, state: Dict[str, Any]) -> str:
  142. """构建精炼检索 query,避免冗余的项目摘要。
  143. 拼接内容:
  144. - 用户原始输入
  145. - 意图识别后的规范化指令
  146. - 当前选中章节编号 + 标题
  147. - 提取的关键词(最多 8 个)
  148. 去重后截取 120 字符。
  149. """
  150. selected_section = state.get("selected_section") or {}
  151. intent_result = state.get("intent_result") or {}
  152. keywords = self.build_query_keywords(state)
  153. parts = [
  154. state.get("user_message") or "",
  155. intent_result.get("normalized_instruction") or "",
  156. f"{selected_section.get('index', '')} {selected_section.get('title', '')}".strip(),
  157. " ".join(keywords[:8]),
  158. ]
  159. return _dedupe_join(parts, max_chars=120)
  160. def build_query_keywords(self, state: Dict[str, Any], query: Optional[str] = None) -> List[str]:
  161. """从多来源提取检索关键词。
  162. 来源优先级:
  163. 1. 用户输入
  164. 2. 意图规范化指令
  165. 3. 章节编号 + 标题
  166. 4. 章节正文内容(前 500 字)
  167. 5. 已拼接的 query
  168. 6. 历史对话中用户消息(排除 AI 回复,防止助手建议污染检索)
  169. 关键词提取规则见 _extract_retrieval_keywords。
  170. """
  171. selected_section = state.get("selected_section") or {}
  172. intent_result = state.get("intent_result") or {}
  173. history = state.get("conversation_history") or []
  174. sources = [
  175. state.get("user_message") or "",
  176. intent_result.get("normalized_instruction") or "",
  177. f"{selected_section.get('index', '')} {selected_section.get('title', '')}",
  178. str(selected_section.get("content") or "")[:500],
  179. query or "",
  180. ]
  181. if history:
  182. for turn in history[-6:]:
  183. if not isinstance(turn, dict):
  184. continue
  185. role = str(turn.get("role") or turn.get("sender") or "").lower()
  186. # 仅取用户消息,跳过 AI 助手回复
  187. if role in ("assistant", "ai", "bot", "model"):
  188. continue
  189. content = str(turn.get("content") or turn.get("message") or "")
  190. if content:
  191. sources.append(content)
  192. keywords: List[str] = []
  193. seen = set()
  194. for text in sources:
  195. for keyword in _extract_retrieval_keywords(str(text or "")):
  196. normalized = keyword.strip()
  197. if not normalized or normalized in seen:
  198. continue
  199. seen.add(normalized)
  200. keywords.append(normalized)
  201. if len(keywords) >= 20:
  202. return keywords
  203. return keywords
  204. # ============================================================
  205. # 主召回入口
  206. # ============================================================
  207. def recall(self, state: Dict[str, Any]) -> Dict[str, Any]:
  208. """执行多路向量召回,RRF 合并,去重过滤。
  209. 返回:
  210. - retrieval_candidates:去重后的候选列表
  211. - retrieval_status:recalled / no_scope / no_recall / disabled
  212. - retrieval_metrics:各路径召回统计
  213. - retrieval_steps:每步详细日志
  214. """
  215. if not self.config.enabled:
  216. return self._empty_result("disabled", [], retrieval_method="disabled")
  217. query = str(state.get("retrieval_query") or "").strip()
  218. if not query:
  219. return self._empty_result("no_recall", [self._warning("no_recall")], retrieval_method="empty_query")
  220. # 提取检索范围(项目ID、工程类型、章节分类等)
  221. scope = self._extract_scope(state)
  222. if not self._has_reliable_scope(scope) and not self.config.allow_unscoped_search:
  223. return self._empty_result(
  224. "no_scope",
  225. [self._warning("no_scope")],
  226. retrieval_method="no_scope",
  227. retrieval_scope=scope,
  228. )
  229. keywords = list(state.get("retrieval_keywords") or self.build_query_keywords(state, query))
  230. retrieval_steps: List[Dict[str, Any]] = []
  231. source_results: Dict[str, List[Dict[str, Any]]] = {}
  232. # ===== 四路召回 =====
  233. source_results["parent_vector"] = self._run_recall_path(
  234. "parent_vector",
  235. lambda: self._recall_by_parent_vector(scope, query),
  236. retrieval_steps,
  237. query=query,
  238. scope=scope,
  239. )
  240. source_results["child_locator"] = self._run_recall_path(
  241. "child_locator",
  242. lambda: self._recall_by_child_locator(scope, query),
  243. retrieval_steps,
  244. query=query,
  245. scope=scope,
  246. )
  247. if self.config.tag_recall_enabled:
  248. source_results["tag"] = self._run_recall_path(
  249. "tag",
  250. lambda: self._recall_by_tag(scope, keywords),
  251. retrieval_steps,
  252. query=" ".join(keywords[: self.config.tag_terms_limit]),
  253. scope=scope,
  254. )
  255. if scope.get("chapter_level_1") and scope.get("chapter_level_2"):
  256. source_results["chapter_similarity"] = self._run_recall_path(
  257. "chapter_similarity",
  258. lambda: self._recall_by_chapter(scope, query),
  259. retrieval_steps,
  260. query=query,
  261. scope=scope,
  262. )
  263. # ===== RRF 合并 + 去重 =====
  264. merged_candidates = self._merge_recall_results(source_results, scope, keywords)
  265. cleaned = self._clean_candidates(merged_candidates)
  266. retrieval_steps.append(
  267. {
  268. "step": "rrf_merge",
  269. "query": query,
  270. "scope": {key: value for key, value in scope.items() if value},
  271. "count": len(merged_candidates),
  272. "items": _pack_log_items(merged_candidates),
  273. }
  274. )
  275. retrieval_steps.append(
  276. {
  277. "step": "clean_candidates",
  278. "count": len(cleaned),
  279. "items": _pack_log_items(cleaned),
  280. }
  281. )
  282. if not cleaned:
  283. return self._empty_result(
  284. "no_recall",
  285. [self._warning("no_recall")],
  286. retrieval_method="multi_path_rrf",
  287. retrieval_scope=scope,
  288. retrieval_steps=retrieval_steps,
  289. )
  290. source_counts = {source: len(items or []) for source, items in source_results.items()}
  291. # 日志:区分请求的 scope、实际应用的过滤、实际召回的文件
  292. applied_expr = self._build_filter_expr(scope)
  293. actual_files = list(dict.fromkeys(
  294. str(item.get("source", ""))[:40]
  295. for item in cleaned
  296. if item.get("source")
  297. ))[:5]
  298. logger.info(
  299. f"[DocumentChat] recall completed: method=multi_path_rrf "
  300. f"requested_scope={dict((k, v) for k, v in scope.items() if v)} "
  301. f"applied_filter='{applied_expr}' "
  302. f"actual_sources={actual_files} "
  303. f"source_counts={source_counts} "
  304. f"total={len(cleaned)} max_sim={max((item.get('vector_similarity', 0.0) for item in cleaned), default=0.0):.4f}"
  305. )
  306. metrics = {
  307. "recall_count": len(cleaned),
  308. "merged_count": len(merged_candidates),
  309. "source_counts": source_counts,
  310. "max_vector_similarity": max((item.get("vector_similarity", 0.0) for item in cleaned), default=0.0),
  311. "max_fusion_score": max((item.get("fusion_score", 0.0) for item in cleaned), default=0.0),
  312. "scope": {key: value for key, value in scope.items() if value},
  313. "retrieval_method": "multi_path_rrf",
  314. }
  315. return {
  316. "retrieval_candidates": cleaned,
  317. "retrieval_steps": retrieval_steps,
  318. "retrieval_status": "recalled",
  319. "retrieval_method": "multi_path_rrf",
  320. "retrieval_metrics": metrics,
  321. "warnings": [],
  322. }
  323. def _run_recall_path(
  324. self,
  325. step: str,
  326. func: Callable[[], List[Dict[str, Any]]],
  327. retrieval_steps: List[Dict[str, Any]],
  328. query: str,
  329. scope: Dict[str, Any],
  330. ) -> List[Dict[str, Any]]:
  331. """执行单路召回,异常时不阻断其他路径。"""
  332. try:
  333. candidates = func() or []
  334. retrieval_steps.append(
  335. {
  336. "step": step,
  337. "query": query,
  338. "scope": {key: value for key, value in scope.items() if value},
  339. "count": len(candidates),
  340. "items": _pack_log_items(candidates),
  341. }
  342. )
  343. return candidates
  344. except Exception as exc:
  345. logger.warning(f"[DocumentChat] {step} recall failed: {exc}", exc_info=True)
  346. retrieval_steps.append(
  347. {
  348. "step": step,
  349. "query": query,
  350. "scope": {key: value for key, value in scope.items() if value},
  351. "count": 0,
  352. "error": str(exc),
  353. "items": [],
  354. }
  355. )
  356. return []
  357. # ============================================================
  358. # 四路召回具体实现
  359. # ============================================================
  360. def _recall_by_parent_vector(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]:
  361. """父表向量检索:Milvus 混合搜索(dense + sparse),直接返回父表文档。"""
  362. from foundation.database.base.vector.milvus_vector import MilvusVectorManager
  363. expr = self._build_filter_expr(scope)
  364. results = MilvusVectorManager().hybrid_search(
  365. param={"collection_name": self.config.parent_collection, "expr": expr},
  366. query_text=query,
  367. top_k=self.config.parent_recall_top_k,
  368. ranker_type=self.config.ranker_type,
  369. dense_weight=self.config.dense_weight,
  370. sparse_weight=self.config.sparse_weight,
  371. )
  372. return [
  373. self._candidate_from_vector_row(row, "parent_vector", scope)
  374. for row in results
  375. if str(row.get("text_content") or "").strip()
  376. ]
  377. def _recall_by_child_locator(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]:
  378. """子表向量定位 + 父表反查:先用 query 在子表中找到匹配片段,
  379. 再通过 parent_id 反查父表行,获取完整的父文档内容。
  380. 优势:子表粒度更细,能精确定位到段落级别,然后拉取对应父文档的完整内容。
  381. """
  382. from foundation.database.base.vector.milvus_vector import MilvusVectorManager
  383. expr = self._build_filter_expr(scope)
  384. child_rows = MilvusVectorManager().hybrid_search(
  385. param={"collection_name": self.config.child_collection, "expr": expr},
  386. query_text=query,
  387. top_k=self.config.child_recall_top_k,
  388. ranker_type=self.config.ranker_type,
  389. dense_weight=self.config.child_dense_weight,
  390. sparse_weight=self.config.child_sparse_weight,
  391. )
  392. # 按 parent_id 分组子表命中结果
  393. child_groups: Dict[str, List[Dict[str, Any]]] = {}
  394. for row in child_rows:
  395. metadata = self._normalize_row_metadata(row.get("metadata") or {})
  396. parent_id = str(self._metadata_value(metadata, "parent_id") or "").strip()
  397. if not parent_id:
  398. continue
  399. child_groups.setdefault(parent_id, []).append(row)
  400. # 通过 parent_id 反查父表
  401. parent_rows = self._fetch_parent_rows_by_parent_ids(list(child_groups.keys()), scope)
  402. candidates = []
  403. for parent_row in parent_rows:
  404. parent_id = str(parent_row.get("parent_id") or "").strip()
  405. matches = child_groups.get(parent_id) or []
  406. max_similarity = max((_to_float(item.get("similarity"), 0.0) for item in matches), default=0.0)
  407. candidate = self._candidate_from_parent_row(parent_row, "child_locator", scope, max_similarity)
  408. metadata = candidate.setdefault("metadata", {})
  409. metadata["child_hit_count"] = len(matches) # 子表命中次数
  410. metadata["matched_child_texts"] = [
  411. str(item.get("text_content") or "").strip()
  412. for item in matches[:5]
  413. if str(item.get("text_content") or "").strip()
  414. ]
  415. candidates.append(candidate)
  416. return candidates
  417. def _recall_by_tag(self, scope: Dict[str, Any], keywords: List[str]) -> List[Dict[str, Any]]:
  418. """标签关键词召回:从关键词中筛选标准号、设备名等专业术语,
  419. 在 tag_list 字段上做 LIKE 匹配。
  420. 注意:标签召回容易过度匹配,因此结果相似度乘以 0.7 打折。
  421. """
  422. tag_terms = self._select_tag_terms(keywords)
  423. if not tag_terms:
  424. return []
  425. tag_expr = self._build_tag_expr(tag_terms)
  426. scope_expr = self._build_filter_expr(scope)
  427. expr = _combine_expr(scope_expr, tag_expr)
  428. # 父表标签匹配
  429. parent_rows = self._condition_query(
  430. collection_name=self.config.parent_collection,
  431. filter_expr=expr,
  432. output_fields=self.PARENT_OUTPUT_FIELDS,
  433. limit=self.config.tag_recall_top_k,
  434. )
  435. candidates = [
  436. self._candidate_from_parent_row(row, "tag", scope, self.config.min_vector_similarity)
  437. for row in parent_rows
  438. ]
  439. # 子表标签匹配,再反查父行
  440. child_rows = self._condition_query(
  441. collection_name=self.config.child_collection,
  442. filter_expr=expr,
  443. output_fields=self.CHILD_OUTPUT_FIELDS,
  444. limit=self.config.tag_recall_top_k,
  445. )
  446. child_parent_ids = []
  447. child_tag_map: Dict[str, List[str]] = {}
  448. for row in child_rows:
  449. parent_id = str(row.get("parent_id") or self._metadata_value(row, "parent_id") or "").strip()
  450. if not parent_id:
  451. continue
  452. child_parent_ids.append(parent_id)
  453. text = str(row.get("text") or "").strip()
  454. if text:
  455. child_tag_map.setdefault(parent_id, []).append(text)
  456. for row in self._fetch_parent_rows_by_parent_ids(child_parent_ids, scope):
  457. parent_id = str(row.get("parent_id") or "").strip()
  458. candidate = self._candidate_from_parent_row(row, "tag", scope, self.config.min_vector_similarity)
  459. metadata = candidate.setdefault("metadata", {})
  460. metadata["matched_child_texts"] = child_tag_map.get(parent_id, [])[:5]
  461. candidates.append(candidate)
  462. # 标签结果打折,防止过度匹配
  463. for candidate in candidates:
  464. candidate["vector_similarity"] *= 0.7
  465. # 记录匹配的标签术语
  466. for candidate in candidates:
  467. metadata = candidate.setdefault("metadata", {})
  468. tag_text = " ".join(
  469. str(value or "")
  470. for value in (
  471. metadata.get("tag_list"),
  472. candidate.get("text"),
  473. " ".join(metadata.get("matched_child_texts") or []),
  474. )
  475. )
  476. metadata["tag_match_terms"] = [term for term in tag_terms if term and term in tag_text]
  477. return candidates
  478. def _recall_by_chapter(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]:
  479. """章节相似度检索:调用现有 similar_fragment_service,
  480. 按 chapter_level_1 + chapter_level_2 限定范围搜索相似片段。
  481. """
  482. from core.construction_write.component.similar_fragment_service import search_similar_fragments
  483. rows = search_similar_fragments(
  484. level1=str(scope.get("chapter_level_1") or ""),
  485. level2=str(scope.get("chapter_level_2") or ""),
  486. search_text=query,
  487. top_k=self.config.chapter_recall_top_k,
  488. )
  489. candidates = []
  490. for row in rows:
  491. metadata = {
  492. "tenant_id": scope.get("tenant_id") or "",
  493. "project_id": scope.get("project_id") or "",
  494. "knowledge_base_id": scope.get("knowledge_base_id") or "",
  495. "file_name": row.get("file_name") or "",
  496. "chapter_level_1": row.get("chapter_level_1") or scope.get("chapter_level_1") or "",
  497. "chapter_level_2": row.get("chapter_level_2") or scope.get("chapter_level_2") or "",
  498. "parent_count": row.get("parent_count", 0),
  499. "source_scope_valid": True, # 通过章节分类限定,天然 scope 匹配
  500. }
  501. text = str(row.get("text") or "").strip()
  502. candidates.append(
  503. {
  504. "candidate_key": self._build_candidate_key({**row, "metadata": metadata}, text),
  505. "text": text,
  506. "source": metadata.get("file_name") or "向量知识库",
  507. "vector_similarity": _to_float(row.get("similarity"), 0.0),
  508. "fusion_score": 0.0,
  509. "metadata": metadata,
  510. "source_hits": {},
  511. "retrieval_source": "chapter_similarity",
  512. }
  513. )
  514. return candidates
  515. # ============================================================
  516. # RRF 合并
  517. # ============================================================
  518. def _merge_recall_results(
  519. self,
  520. source_results: Dict[str, List[Dict[str, Any]]],
  521. scope: Dict[str, Any],
  522. keywords: List[str],
  523. ) -> List[Dict[str, Any]]:
  524. """多路召回结果 RRF 融合合并。
  525. 融合分数计算:
  526. - 基础分:weight / (rrf_k + rank),按路径权重和排名计算
  527. - 多源加分:同一条候选在多个路径中被召回时额外加分
  528. - Scope 加分:与当前项目范围一致时额外加分
  529. - 标签加分:关键词出现在候选文本中时额外加分
  530. """
  531. weights = {
  532. "parent_vector": self.config.parent_vector_weight,
  533. "child_locator": self.config.child_locator_weight,
  534. "tag": self.config.tag_weight,
  535. "chapter_similarity": self.config.chapter_similarity_weight,
  536. }
  537. merged: Dict[str, Dict[str, Any]] = {}
  538. for source, candidates in source_results.items():
  539. weight = weights.get(source, 0.0)
  540. for rank, item in enumerate(candidates or [], start=1):
  541. key = str(item.get("candidate_key") or self._build_candidate_key(item, item.get("text")))
  542. if not key:
  543. continue
  544. if key not in merged:
  545. candidate = dict(item)
  546. candidate["candidate_key"] = key
  547. candidate["source_hits"] = {}
  548. candidate["fusion_score"] = 0.0
  549. merged[key] = candidate
  550. current = merged[key]
  551. # RRF 公式:累加 weight / (rrf_k + rank)
  552. current["fusion_score"] = _to_float(current.get("fusion_score"), 0.0) + weight / (self.config.rrf_k + rank)
  553. current["vector_similarity"] = max(
  554. _to_float(current.get("vector_similarity"), 0.0),
  555. _to_float(item.get("vector_similarity"), 0.0),
  556. )
  557. current.setdefault("source_hits", {})[source] = {
  558. "rank": rank,
  559. "vector_similarity": _to_float(item.get("vector_similarity"), 0.0),
  560. }
  561. self._merge_metadata(current, item)
  562. # 额外加分
  563. for candidate in merged.values():
  564. source_hits = candidate.get("source_hits") if isinstance(candidate.get("source_hits"), dict) else {}
  565. metadata = candidate.get("metadata") if isinstance(candidate.get("metadata"), dict) else {}
  566. if len(source_hits) > 1:
  567. candidate["fusion_score"] += self.config.multi_source_bonus
  568. if self._metadata_matches_scope(metadata, scope):
  569. candidate["fusion_score"] += self.config.scope_bonus
  570. candidate["fusion_score"] += self._calc_tag_bonus(candidate, keywords)
  571. return sorted(merged.values(), key=lambda item: item.get("fusion_score", 0.0), reverse=True)[: self.config.recall_top_k]
  572. # ============================================================
  573. # Milvus 查询辅助
  574. # ============================================================
  575. def _fetch_parent_rows_by_parent_ids(self, parent_ids: List[str], scope: Dict[str, Any]) -> List[Dict[str, Any]]:
  576. """根据 parent_id 列表反查父表行,去重后逐条查询。"""
  577. unique_ids = []
  578. seen = set()
  579. for parent_id in parent_ids:
  580. value = str(parent_id or "").strip()
  581. if value and value not in seen:
  582. seen.add(value)
  583. unique_ids.append(value)
  584. rows: List[Dict[str, Any]] = []
  585. scope_expr = self._build_filter_expr(scope)
  586. for parent_id in unique_ids[: self.config.recall_top_k]:
  587. parent_expr = f"parent_id == '{_escape_milvus_string(parent_id)}'"
  588. expr = _combine_expr(parent_expr, scope_expr)
  589. rows.extend(
  590. self._condition_query(
  591. collection_name=self.config.parent_collection,
  592. filter_expr=expr,
  593. output_fields=self.PARENT_OUTPUT_FIELDS,
  594. limit=100,
  595. )
  596. )
  597. return rows
  598. def _condition_query(
  599. self,
  600. collection_name: str,
  601. filter_expr: str,
  602. output_fields: List[str],
  603. limit: int,
  604. ) -> List[Dict[str, Any]]:
  605. """Milvus 条件查询(非向量),按 filter 表达式筛选文档。"""
  606. from core.construction_write.component.similar_fragment_service import get_milvus_manager
  607. if not filter_expr:
  608. return []
  609. return get_milvus_manager().condition_query(
  610. collection_name=collection_name,
  611. filter=filter_expr,
  612. output_fields=output_fields,
  613. limit=limit,
  614. )
  615. # ============================================================
  616. # 候选构建
  617. # ============================================================
  618. def _candidate_from_vector_row(self, row: Dict[str, Any], source: str, scope: Dict[str, Any]) -> Dict[str, Any]:
  619. """从 Milvus 混合搜索结果行构建标准候选。"""
  620. metadata = self._normalize_row_metadata(row.get("metadata") or {})
  621. text = str(row.get("text_content") or row.get("text") or "").strip()
  622. metadata["source_scope_valid"] = self._metadata_matches_scope(metadata, scope)
  623. return {
  624. "candidate_key": self._build_candidate_key(metadata, text),
  625. "text": text,
  626. "source": metadata.get("file_name") or metadata.get("title") or "向量知识库",
  627. "vector_similarity": _to_float(row.get("similarity"), 0.0),
  628. "fusion_score": 0.0,
  629. "metadata": metadata,
  630. "source_hits": {},
  631. "retrieval_source": source,
  632. }
  633. def _candidate_from_parent_row(
  634. self,
  635. row: Dict[str, Any],
  636. source: str,
  637. scope: Dict[str, Any],
  638. vector_similarity: float,
  639. ) -> Dict[str, Any]:
  640. """从父表行构建标准候选。"""
  641. metadata = self._normalize_row_metadata(row)
  642. text = str(row.get("text") or "").strip()
  643. metadata["source_scope_valid"] = self._metadata_matches_scope(metadata, scope)
  644. return {
  645. "candidate_key": self._build_candidate_key(metadata, text),
  646. "text": text,
  647. "source": metadata.get("file_name") or "向量知识库",
  648. "vector_similarity": _to_float(vector_similarity, 0.0),
  649. "fusion_score": 0.0,
  650. "metadata": metadata,
  651. "source_hits": {},
  652. "retrieval_source": source,
  653. }
  654. # ============================================================
  655. # Scope 提取与过滤
  656. # ============================================================
  657. def _extract_scope(self, state: Dict[str, Any]) -> Dict[str, Any]:
  658. """从工作流状态中提取检索范围信息。
  659. 按优先级从 selected_section、document_context、project_info、retrieval_filters
  660. 中查找字段值,兼容多种字段命名。
  661. """
  662. selected = state.get("selected_section") or {}
  663. context = state.get("document_context") or {}
  664. project = state.get("project_info") or {}
  665. filters = context.get("retrieval_filters") if isinstance(context.get("retrieval_filters"), dict) else {}
  666. filters = filters or project.get("retrieval_filters") if isinstance(project.get("retrieval_filters"), dict) else filters
  667. def pick(*keys: str) -> str:
  668. for source in (selected, context, project, filters or {}):
  669. for key in keys:
  670. value = source.get(key) if isinstance(source, dict) else None
  671. if value not in (None, ""):
  672. return str(value).strip()
  673. return ""
  674. return {
  675. "tenant_id": pick("tenant_id"),
  676. "project_id": pick("project_id"),
  677. "knowledge_base_id": pick("knowledge_base_id", "kb_id"),
  678. "engineering_type": pick("engineering_type", "project_type"),
  679. "plan_type": pick("plan_type"),
  680. "chapter_level_1": pick("chapter_level_1", "level1"),
  681. "chapter_level_2": pick("chapter_level_2", "level2"),
  682. "chapter_level_3": pick("chapter_level_3", "level3"),
  683. }
  684. @staticmethod
  685. def _has_reliable_scope(scope: Dict[str, Any]) -> bool:
  686. """判断是否有足够可靠的 scope 用于限定检索范围。"""
  687. if scope.get("chapter_level_1") and scope.get("chapter_level_2"):
  688. return True
  689. return bool(scope.get("plan_type"))
  690. def _build_filter_expr(self, scope: Dict[str, Any]) -> str:
  691. """构建 Milvus 过滤表达式,按章节层级限定检索范围。"""
  692. conditions = []
  693. for key in ("plan_type", "chapter_level_1", "chapter_level_2", "chapter_level_3"):
  694. value = str(scope.get(key) or "").strip()
  695. if value:
  696. conditions.append(f"{key} == '{_escape_milvus_string(value)}'")
  697. return " and ".join(conditions)
  698. def _build_tag_expr(self, tag_terms: List[str]) -> str:
  699. """构建标签 LIKE 查询表达式。"""
  700. conditions = []
  701. for term in tag_terms[: self.config.tag_terms_limit]:
  702. conditions.append(f'tag_list like "%{_escape_milvus_string(term)}%"')
  703. return " or ".join(conditions)
  704. def _select_tag_terms(self, keywords: List[str]) -> List[str]:
  705. """从关键词中筛选高价值标签术语。
  706. 排除:验收、标准、规范等通用词(几乎匹配所有文档)
  707. 优先:标准号(如 TB10212-2012)、设备名(架桥机、龙门吊等)
  708. """
  709. generic_terms = {
  710. "验收", "标准", "规范", "检查", "检测", "试验", "安装", "拆除",
  711. "要求", "安全", "环保", "质量", "进度", "交底",
  712. }
  713. device_terms = {"架桥机", "龙门吊", "吊车", "塔吊", "施工电梯", "挂篮", "支架", "台车"}
  714. selected = []
  715. priority = [] # 标准号和设备名优先
  716. seen = set()
  717. for keyword in keywords:
  718. value = str(keyword or "").strip()
  719. if len(value) < 2 or value in seen:
  720. continue
  721. seen.add(value)
  722. if value in generic_terms:
  723. continue
  724. if re.match(r"[A-Z]{1,3}\d{4,}", value) or value in device_terms:
  725. priority.append(value)
  726. elif len(selected) < self.config.tag_terms_limit:
  727. selected.append(value)
  728. return priority + selected
  729. @staticmethod
  730. def _metadata_matches_scope(metadata: Dict[str, Any], scope: Dict[str, Any]) -> bool:
  731. """检查候选 metadata 是否与当前检索 scope 兼容。
  732. 不要求所有字段都匹配,仅校验 scope 和 metadata 同时存在且不一致的字段。
  733. """
  734. required_keys = ["tenant_id", "project_id", "knowledge_base_id", "chapter_level_1", "chapter_level_2", "chapter_level_3"]
  735. for key in required_keys:
  736. expected = str(scope.get(key) or "").strip()
  737. if not expected:
  738. continue
  739. actual = str(metadata.get(key) or "").strip()
  740. if actual and actual != expected:
  741. return False
  742. return True
  743. # ============================================================
  744. # Metadata 处理
  745. # ============================================================
  746. def _normalize_row_metadata(self, row_or_metadata: Any) -> Dict[str, Any]:
  747. """规范化行数据为统一的 metadata 字典。处理嵌套 metadata 和 YAML 字符串。"""
  748. metadata = self._normalize_metadata(row_or_metadata)
  749. inner = self._normalize_metadata(metadata.get("metadata")) if metadata.get("metadata") else {}
  750. for key, value in inner.items():
  751. metadata.setdefault(key, value)
  752. for key in self.PARENT_OUTPUT_FIELDS:
  753. if isinstance(row_or_metadata, dict) and row_or_metadata.get(key) not in (None, ""):
  754. metadata[key] = row_or_metadata.get(key)
  755. return metadata
  756. @staticmethod
  757. def _normalize_metadata(metadata: Any) -> Dict[str, Any]:
  758. """将 metadata 转为字典,支持 YAML 字符串解析。"""
  759. if isinstance(metadata, dict):
  760. return dict(metadata)
  761. if isinstance(metadata, str) and metadata.strip():
  762. try:
  763. loaded = yaml.safe_load(metadata)
  764. return dict(loaded) if isinstance(loaded, dict) else {}
  765. except Exception:
  766. return {}
  767. return {}
  768. @staticmethod
  769. def _metadata_value(metadata: Dict[str, Any], key: str) -> Any:
  770. """安全获取 metadata 值,支持嵌套 metadata.metadata 和 YAML 字符串。"""
  771. if key in metadata:
  772. return metadata.get(key)
  773. nested = metadata.get("metadata")
  774. if isinstance(nested, dict):
  775. return nested.get(key)
  776. if isinstance(nested, str) and nested.strip():
  777. try:
  778. parsed = yaml.safe_load(nested)
  779. if isinstance(parsed, dict):
  780. return parsed.get(key)
  781. except Exception:
  782. return None
  783. return None
  784. def _build_candidate_key(self, metadata: Dict[str, Any], text: Any = "") -> str:
  785. """构建候选唯一标识键,按优先级尝试不同字段组合。"""
  786. metadata = self._normalize_row_metadata(metadata)
  787. document_id = str(self._metadata_value(metadata, "document_id") or "").strip()
  788. parent_id = str(self._metadata_value(metadata, "parent_id") or "").strip()
  789. chunk_id = str(self._metadata_value(metadata, "chunk_id") or "").strip()
  790. chapter_title = str(self._metadata_value(metadata, "chapter_title") or "").strip()
  791. index = self._metadata_value(metadata, "index")
  792. pk = str(self._metadata_value(metadata, "pk") or "").strip()
  793. if document_id and parent_id and chunk_id:
  794. return f"{document_id}::{parent_id}::{chunk_id}"
  795. if document_id and parent_id and chapter_title and index not in (None, ""):
  796. return f"{document_id}::{parent_id}::{chapter_title}::{index}"
  797. if pk:
  798. return pk
  799. if parent_id and chapter_title and index not in (None, ""):
  800. return f"{parent_id}::{chapter_title}::{index}"
  801. return str(text or "")[:300]
  802. def _merge_metadata(self, current: Dict[str, Any], incoming: Dict[str, Any]) -> None:
  803. """合并两条候选的 metadata,不覆盖已有非空值。"""
  804. current_meta = current.setdefault("metadata", {})
  805. incoming_meta = incoming.get("metadata") if isinstance(incoming.get("metadata"), dict) else {}
  806. for key, value in incoming_meta.items():
  807. if key not in current_meta or current_meta.get(key) in (None, "", []):
  808. current_meta[key] = value
  809. if incoming.get("source") and not current.get("source"):
  810. current["source"] = incoming.get("source")
  811. # ============================================================
  812. # 加分计算
  813. # ============================================================
  814. def _calc_tag_bonus(self, candidate: Dict[str, Any], keywords: List[str]) -> float:
  815. """计算标签匹配加分:关键词精确匹配 tag_list 加分更多,仅出现在文本中加分较少。"""
  816. metadata = candidate.get("metadata") if isinstance(candidate.get("metadata"), dict) else {}
  817. text = " ".join(
  818. str(value or "")
  819. for value in (
  820. candidate.get("text"),
  821. metadata.get("tag_list"),
  822. " ".join(metadata.get("matched_child_texts") or []),
  823. )
  824. )
  825. bonus = 0.0
  826. for keyword in self._select_tag_terms(keywords):
  827. if not keyword:
  828. continue
  829. if keyword in str(metadata.get("tag_list") or ""):
  830. bonus += self.config.tag_exact_bonus
  831. elif keyword in text:
  832. bonus += self.config.tag_partial_bonus
  833. return bonus
  834. # ============================================================
  835. # 候选清理
  836. # ============================================================
  837. def _clean_candidates(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
  838. """清理候选:过滤过短文本、双重去重(candidate_key + 内容哈希)。
  839. 去重策略:
  840. 1. candidate_key 去重:相同 document+parent+chunk 视为同一条
  841. 2. 内容哈希去重:同一文件同一文本内容(即使路径不同)只保留一条
  842. """
  843. cleaned = []
  844. seen_keys = set()
  845. seen_hashes = set()
  846. for item in candidates:
  847. text = str(item.get("text") or "").strip()
  848. if len(text) < 20:
  849. continue
  850. metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
  851. dedupe_key = str(item.get("candidate_key") or text[:300])
  852. # 内容哈希去重
  853. content_hash = _content_hash(text[:300])
  854. file_name = str(metadata.get("file_name") or "")
  855. hash_key = f"{file_name}::{content_hash}"
  856. if dedupe_key in seen_keys or hash_key in seen_hashes:
  857. continue
  858. seen_keys.add(dedupe_key)
  859. seen_hashes.add(hash_key)
  860. metadata["candidate_key"] = dedupe_key
  861. cleaned.append(
  862. {
  863. "candidate_key": dedupe_key,
  864. "text": text[: self.config.max_single_reference_chars],
  865. "source": str(item.get("source") or metadata.get("file_name") or "向量知识库"),
  866. "vector_similarity": _to_float(item.get("vector_similarity"), 0.0),
  867. "fusion_score": _to_float(item.get("fusion_score"), 0.0),
  868. "source_hits": item.get("source_hits") if isinstance(item.get("source_hits"), dict) else {},
  869. "metadata": metadata,
  870. }
  871. )
  872. cleaned.sort(key=lambda item: (item.get("fusion_score", 0.0), item.get("vector_similarity", 0.0)), reverse=True)
  873. return cleaned[: self.config.recall_top_k]
  874. # ============================================================
  875. # 空结果/告警
  876. # ============================================================
  877. def _empty_result(
  878. self,
  879. status: str,
  880. warnings: List[str],
  881. retrieval_method: str = "",
  882. retrieval_scope: Optional[Dict[str, Any]] = None,
  883. retrieval_steps: Optional[List[Dict[str, Any]]] = None,
  884. ) -> Dict[str, Any]:
  885. """构建空召回结果。"""
  886. return {
  887. "retrieval_candidates": [],
  888. "retrieval_steps": retrieval_steps or [],
  889. "retrieval_status": status,
  890. "retrieval_method": retrieval_method,
  891. "retrieval_metrics": {
  892. "recall_count": 0,
  893. "retrieval_method": retrieval_method,
  894. "scope": {key: value for key, value in (retrieval_scope or {}).items() if value},
  895. },
  896. "warnings": warnings,
  897. }
  898. def _warning(self, key: str) -> str:
  899. """获取指定键的告警文案。"""
  900. warnings = self.config.warnings or _default_warnings()
  901. return warnings.get(key) or ""
  902. def _default_warnings() -> Dict[str, str]:
  903. return {
  904. "no_scope": "缺少可靠的知识库检索范围,本次未引用向量库内容。",
  905. "no_recall": "未召回可信知识库内容,本次回答不引用向量库。",
  906. "low_confidence": "未找到可信度足够的知识库片段,本次未引用向量库内容。",
  907. "rerank_failed": "知识库片段重排不可用,本次未引用向量库内容。",
  908. }
  909. def _escape_milvus_string(value: str) -> str:
  910. """转义 Milvus 字符串中的特殊字符(反斜杠、单引号、双引号)。"""
  911. return str(value).replace("\\", "\\\\").replace("'", "\\'").replace('"', '\\"')
  912. def _combine_expr(*exprs: str) -> str:
  913. """用 AND 连接多个过滤表达式,每个子表达式加括号。"""
  914. parts = [f"({expr})" for expr in exprs if str(expr or "").strip()]
  915. return " and ".join(parts)
  916. def _dedupe_join(parts: List[str], max_chars: int) -> str:
  917. """去重后拼接文本片段,限制总长度。"""
  918. values = []
  919. seen = set()
  920. for part in parts:
  921. value = re.sub(r"\s+", " ", str(part or "")).strip()
  922. if not value or value in seen:
  923. continue
  924. seen.add(value)
  925. values.append(value)
  926. return " ".join(values)[:max_chars]
  927. def _extract_retrieval_keywords(text: str) -> List[str]:
  928. """从文本中提取检索关键词,支持多种模式:
  929. 1. 标准号/型号:如 TB10212-2012、φ48.3×3.6
  930. 2. 规范名称:《XXX规范》
  931. 3. 领域专业术语:验收、架桥机、箱梁等
  932. 4. 术语+动作组合:XX验收、XX安装
  933. 5. 长词中的领域术语片段
  934. """
  935. if not text:
  936. return []
  937. keywords: List[str] = []
  938. # 模式1:标准号/型号(字母+数字,可选连字符)
  939. for match in re.findall(r"[A-Za-z]{1,8}\s*\d{2,8}(?:[-—]\d{2,4})?", text):
  940. keywords.append(re.sub(r"\s+", "", match).upper())
  941. # 模式2:《XXX》规范名称
  942. for match in re.findall(r"《([^》]{2,40})》", text):
  943. keywords.append(match.strip())
  944. # 模式3:领域专业术语
  945. domain_terms = (
  946. "验收", "标准", "规范", "检查", "检测", "试验", "安装", "拆除", "吊装",
  947. "架桥机", "龙门吊", "吊车", "箱梁", "T梁", "梁板", "钢丝绳", "支座",
  948. "地基", "安全装置", "操作证", "合格证", "静载", "动载", "空载",
  949. )
  950. for term in domain_terms:
  951. if term in text:
  952. keywords.append(term)
  953. # 模式4:术语+动作组合
  954. for match in re.findall(r"[一-鿿A-Za-z0-9.-]{0,12}(?:验收|标准|规范|检查|检测|试验|安装|拆除|吊装|要求)", text):
  955. if 2 <= len(match) <= 20:
  956. keywords.append(match)
  957. # 模式5:分词后含领域术语的片段
  958. normalized = re.sub(r"[\s,,。;;::、/\\|()\[\]{}<>《》\"'""??]+", " ", text)
  959. for token in normalized.split():
  960. token = token.strip()
  961. if len(token) < 2 or len(token) > 12:
  962. continue
  963. if any(term in token for term in domain_terms):
  964. keywords.append(token)
  965. seen = set()
  966. unique = []
  967. for keyword in keywords:
  968. keyword = keyword.strip()
  969. if keyword and keyword not in seen:
  970. seen.add(keyword)
  971. unique.append(keyword)
  972. return unique
  973. def _pack_log_items(items: List[Dict[str, Any]], limit: int = 20, text_limit: int = 1500) -> List[Dict[str, Any]]:
  974. """打包候选条目为日志格式,限制条数和文本长度。"""
  975. packed = []
  976. for item in (items or [])[:limit]:
  977. if not isinstance(item, dict):
  978. continue
  979. metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
  980. text = str(item.get("text") or item.get("text_content") or item.get("content") or "").strip()
  981. packed.append(
  982. {
  983. "candidate_key": item.get("candidate_key"),
  984. "source": item.get("source") or metadata.get("file_name") or "",
  985. "text": text[:text_limit],
  986. "vector_similarity": _to_float(item.get("vector_similarity", item.get("similarity")), 0.0),
  987. "fusion_score": _to_float(item.get("fusion_score"), 0.0),
  988. "rerank_score": _to_float(item.get("rerank_score"), 0.0) if "rerank_score" in item else None,
  989. "source_hits": item.get("source_hits") if isinstance(item.get("source_hits"), dict) else {},
  990. "metadata": {
  991. key: metadata.get(key)
  992. for key in (
  993. "document_id", "parent_id", "file_name", "chapter_title",
  994. "chapter_level_1", "chapter_level_2", "chapter_level_3",
  995. "parent_count", "child_hit_count", "matched_child_texts",
  996. "tag_match_terms", "source_scope_valid",
  997. )
  998. if metadata.get(key) not in (None, "")
  999. },
  1000. }
  1001. )
  1002. return packed
  1003. def _to_int(value: Any, default: int) -> int:
  1004. """安全整数转换。"""
  1005. try:
  1006. return int(value)
  1007. except (TypeError, ValueError):
  1008. return default
  1009. def _to_float(value: Any, default: float = 0.0) -> float:
  1010. """安全浮点数转换。"""
  1011. try:
  1012. return float(value)
  1013. except (TypeError, ValueError):
  1014. return default
  1015. def _content_hash(text: str) -> str:
  1016. """基于归一化文本的短 MD5 哈希,用于内容去重。"""
  1017. normalized = re.sub(r"\s+", " ", text.strip().lower())
  1018. return md5(normalized.encode("utf-8")).hexdigest()[:12]