retrieval_service.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. # -*- coding: utf-8 -*-
  2. """Quality-first vector retrieval for document chat."""
  3. from __future__ import annotations
  4. from dataclasses import dataclass
  5. from pathlib import Path
  6. from typing import Any, Dict, List, Optional
  7. import yaml
  8. from foundation.observability.logger.loggering import write_logger as logger
  9. PROJECT_ROOT = Path(__file__).resolve().parents[3]
  10. RETRIEVAL_CONFIG_PATH = PROJECT_ROOT / "config" / "document_chat_retrieval.yaml"
  11. @dataclass(frozen=True)
  12. class RetrievalConfig:
  13. enabled: bool = True
  14. child_collection: str = "t_kngs_construction_plan_child"
  15. recall_top_k: int = 30
  16. rerank_top_k: int = 8
  17. submit_top_k: int = 3
  18. min_vector_similarity: float = 0.45
  19. min_rerank_score: float = 0.70
  20. min_qualified_count: int = 1
  21. max_reference_chars: int = 4000
  22. max_single_reference_chars: int = 1500
  23. allow_vector_fallback: bool = False
  24. allow_unscoped_search: bool = False
  25. dense_weight: float = 0.7
  26. sparse_weight: float = 0.3
  27. ranker_type: str = "weighted"
  28. warnings: Dict[str, str] = None
  29. def load_retrieval_config() -> RetrievalConfig:
  30. if not RETRIEVAL_CONFIG_PATH.exists():
  31. return RetrievalConfig(warnings=_default_warnings())
  32. with open(RETRIEVAL_CONFIG_PATH, "r", encoding="utf-8") as handle:
  33. raw = yaml.safe_load(handle) or {}
  34. retrieval = raw.get("retrieval") or {}
  35. warnings = raw.get("warnings") or _default_warnings()
  36. return RetrievalConfig(
  37. enabled=bool(retrieval.get("enabled", True)),
  38. child_collection=str(retrieval.get("child_collection") or "t_kngs_construction_plan_child"),
  39. recall_top_k=_to_int(retrieval.get("recall_top_k"), 30),
  40. rerank_top_k=_to_int(retrieval.get("rerank_top_k"), 8),
  41. submit_top_k=_to_int(retrieval.get("submit_top_k"), 3),
  42. min_vector_similarity=_to_float(retrieval.get("min_vector_similarity"), 0.45),
  43. min_rerank_score=_to_float(retrieval.get("min_rerank_score"), 0.70),
  44. min_qualified_count=_to_int(retrieval.get("min_qualified_count"), 1),
  45. max_reference_chars=_to_int(retrieval.get("max_reference_chars"), 4000),
  46. max_single_reference_chars=_to_int(retrieval.get("max_single_reference_chars"), 1500),
  47. allow_vector_fallback=bool(retrieval.get("allow_vector_fallback", False)),
  48. allow_unscoped_search=bool(retrieval.get("allow_unscoped_search", False)),
  49. dense_weight=_to_float(retrieval.get("dense_weight"), 0.7),
  50. sparse_weight=_to_float(retrieval.get("sparse_weight"), 0.3),
  51. ranker_type=str(retrieval.get("ranker_type") or "weighted"),
  52. warnings=warnings,
  53. )
  54. class DocumentChatRetrievalService:
  55. """Build retrieval queries and fetch quality candidates.
  56. Retrieval is intentionally conservative: when no reliable scope is present
  57. and unscoped search is disabled, it returns no candidates.
  58. """
  59. def __init__(self, config: Optional[RetrievalConfig] = None):
  60. self.config = config or load_retrieval_config()
  61. def build_query(self, state: Dict[str, Any]) -> str:
  62. selected_section = state.get("selected_section") or {}
  63. project_info = state.get("project_info") or {}
  64. intent_result = state.get("intent_result") or {}
  65. section_content = str(selected_section.get("content") or "")
  66. section_preview = section_content[:1000]
  67. parts = [
  68. f"项目名称:{project_info.get('project_name') or project_info.get('name') or ''}",
  69. f"工程类型:{project_info.get('engineering_type') or project_info.get('project_type') or ''}",
  70. f"施工位置:{project_info.get('construct_location') or project_info.get('location') or ''}",
  71. f"章节:{selected_section.get('index', '')} {selected_section.get('title', '')}",
  72. f"用户需求:{state.get('user_message') or ''}",
  73. f"归一化需求:{intent_result.get('normalized_instruction') or ''}",
  74. f"当前章节摘要:{section_preview}",
  75. ]
  76. return "\n".join(part for part in parts if part.split(":", 1)[-1].strip())
  77. def recall(self, state: Dict[str, Any]) -> Dict[str, Any]:
  78. if not self.config.enabled:
  79. return self._empty_result("disabled", [], retrieval_method="disabled")
  80. query = str(state.get("retrieval_query") or "").strip()
  81. if not query:
  82. return self._empty_result("no_recall", [self._warning("no_recall")], retrieval_method="empty_query")
  83. scope = self._extract_scope(state)
  84. if not self._has_reliable_scope(scope) and not self.config.allow_unscoped_search:
  85. return self._empty_result(
  86. "no_scope",
  87. [self._warning("no_scope")],
  88. retrieval_method="no_scope",
  89. retrieval_scope=scope,
  90. )
  91. try:
  92. if scope.get("chapter_level_1") and scope.get("chapter_level_2"):
  93. retrieval_method = "chapter_similarity"
  94. candidates = self._recall_by_chapter(scope, query)
  95. else:
  96. retrieval_method = "milvus_hybrid_vector"
  97. candidates = self._recall_by_vector(scope, query)
  98. except Exception as exc:
  99. logger.warning(f"[DocumentChat] retrieval failed: {exc}", exc_info=True)
  100. return self._empty_result(
  101. "no_recall",
  102. [self._warning("no_recall")],
  103. retrieval_method=retrieval_method if "retrieval_method" in locals() else "unknown",
  104. retrieval_scope=scope,
  105. )
  106. candidates = self._clean_candidates(candidates)
  107. if not candidates:
  108. return self._empty_result(
  109. "no_recall",
  110. [self._warning("no_recall")],
  111. retrieval_method=retrieval_method,
  112. retrieval_scope=scope,
  113. )
  114. metrics = {
  115. "recall_count": len(candidates),
  116. "max_vector_similarity": max((item.get("vector_similarity", 0.0) for item in candidates), default=0.0),
  117. "scope": {key: value for key, value in scope.items() if value},
  118. "retrieval_method": retrieval_method,
  119. }
  120. return {
  121. "retrieval_candidates": candidates,
  122. "retrieval_status": "recalled",
  123. "retrieval_method": retrieval_method,
  124. "retrieval_metrics": metrics,
  125. "warnings": [],
  126. }
  127. def _recall_by_chapter(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]:
  128. from core.construction_write.component.similar_fragment_service import search_similar_fragments
  129. rows = search_similar_fragments(
  130. level1=str(scope.get("chapter_level_1") or ""),
  131. level2=str(scope.get("chapter_level_2") or ""),
  132. search_text=query,
  133. top_k=self.config.recall_top_k,
  134. )
  135. candidates = []
  136. for row in rows:
  137. text = str(row.get("text") or "").strip()
  138. metadata = {
  139. "tenant_id": scope.get("tenant_id") or "",
  140. "project_id": scope.get("project_id") or "",
  141. "knowledge_base_id": scope.get("knowledge_base_id") or "",
  142. "file_name": row.get("file_name") or "",
  143. "chapter_level_1": row.get("chapter_level_1") or scope.get("chapter_level_1") or "",
  144. "chapter_level_2": row.get("chapter_level_2") or scope.get("chapter_level_2") or "",
  145. "parent_count": row.get("parent_count", 0),
  146. "source_scope_valid": True,
  147. }
  148. candidates.append(
  149. {
  150. "text": text,
  151. "source": metadata.get("file_name") or "向量知识库",
  152. "vector_similarity": _to_float(row.get("similarity"), 0.0),
  153. "metadata": metadata,
  154. }
  155. )
  156. return candidates
  157. def _recall_by_vector(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]:
  158. from foundation.database.base.vector.milvus_vector import MilvusVectorManager
  159. expr = self._build_filter_expr(scope)
  160. if not expr and not self.config.allow_unscoped_search:
  161. return []
  162. results = MilvusVectorManager().hybrid_search(
  163. param={"collection_name": self.config.child_collection, "expr": expr},
  164. query_text=query,
  165. top_k=self.config.recall_top_k,
  166. ranker_type=self.config.ranker_type,
  167. dense_weight=self.config.dense_weight,
  168. sparse_weight=self.config.sparse_weight,
  169. )
  170. candidates = []
  171. for row in results:
  172. metadata = self._normalize_metadata(row.get("metadata") or {})
  173. source_scope_valid = self._metadata_matches_scope(metadata, scope)
  174. metadata["source_scope_valid"] = source_scope_valid
  175. candidates.append(
  176. {
  177. "text": str(row.get("text_content") or "").strip(),
  178. "source": metadata.get("file_name") or metadata.get("title") or "向量知识库",
  179. "vector_similarity": _to_float(row.get("similarity"), 0.0),
  180. "metadata": metadata,
  181. }
  182. )
  183. return candidates
  184. def _extract_scope(self, state: Dict[str, Any]) -> Dict[str, Any]:
  185. selected = state.get("selected_section") or {}
  186. context = state.get("document_context") or {}
  187. project = state.get("project_info") or {}
  188. filters = context.get("retrieval_filters") if isinstance(context.get("retrieval_filters"), dict) else {}
  189. filters = filters or project.get("retrieval_filters") if isinstance(project.get("retrieval_filters"), dict) else filters
  190. def pick(*keys: str) -> str:
  191. for source in (selected, context, project, filters or {}):
  192. for key in keys:
  193. value = source.get(key) if isinstance(source, dict) else None
  194. if value not in (None, ""):
  195. return str(value).strip()
  196. return ""
  197. return {
  198. "tenant_id": pick("tenant_id"),
  199. "project_id": pick("project_id"),
  200. "knowledge_base_id": pick("knowledge_base_id", "kb_id"),
  201. "engineering_type": pick("engineering_type", "project_type"),
  202. "chapter_level_1": pick("chapter_level_1", "level1"),
  203. "chapter_level_2": pick("chapter_level_2", "level2"),
  204. }
  205. @staticmethod
  206. def _has_reliable_scope(scope: Dict[str, Any]) -> bool:
  207. if scope.get("chapter_level_1") and scope.get("chapter_level_2"):
  208. return True
  209. return bool(scope.get("tenant_id") or scope.get("project_id") or scope.get("knowledge_base_id"))
  210. def _build_filter_expr(self, scope: Dict[str, Any]) -> str:
  211. conditions = []
  212. for key in ("tenant_id", "project_id", "knowledge_base_id", "engineering_type", "chapter_level_1", "chapter_level_2"):
  213. value = str(scope.get(key) or "").strip()
  214. if value:
  215. conditions.append(f"{key} == '{_escape_milvus_string(value)}'")
  216. return " and ".join(conditions)
  217. @staticmethod
  218. def _metadata_matches_scope(metadata: Dict[str, Any], scope: Dict[str, Any]) -> bool:
  219. required_keys = ["tenant_id", "project_id", "knowledge_base_id", "chapter_level_1", "chapter_level_2"]
  220. for key in required_keys:
  221. expected = str(scope.get(key) or "").strip()
  222. if not expected:
  223. continue
  224. actual = str(metadata.get(key) or "").strip()
  225. if actual and actual != expected:
  226. return False
  227. return True
  228. @staticmethod
  229. def _normalize_metadata(metadata: Any) -> Dict[str, Any]:
  230. if isinstance(metadata, dict):
  231. return dict(metadata)
  232. if isinstance(metadata, str) and metadata.strip():
  233. try:
  234. loaded = yaml.safe_load(metadata)
  235. return dict(loaded) if isinstance(loaded, dict) else {}
  236. except Exception:
  237. return {}
  238. return {}
  239. def _clean_candidates(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
  240. cleaned = []
  241. seen = set()
  242. for item in candidates:
  243. text = str(item.get("text") or "").strip()
  244. if len(text) < 20:
  245. continue
  246. dedupe_key = text[:300]
  247. if dedupe_key in seen:
  248. continue
  249. seen.add(dedupe_key)
  250. metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
  251. cleaned.append(
  252. {
  253. "text": text[: self.config.max_single_reference_chars],
  254. "source": str(item.get("source") or metadata.get("file_name") or "向量知识库"),
  255. "vector_similarity": _to_float(item.get("vector_similarity"), 0.0),
  256. "metadata": metadata,
  257. }
  258. )
  259. cleaned.sort(key=lambda item: item.get("vector_similarity", 0.0), reverse=True)
  260. return cleaned[: self.config.recall_top_k]
  261. def _empty_result(
  262. self,
  263. status: str,
  264. warnings: List[str],
  265. retrieval_method: str = "",
  266. retrieval_scope: Optional[Dict[str, Any]] = None,
  267. ) -> Dict[str, Any]:
  268. return {
  269. "retrieval_candidates": [],
  270. "retrieval_status": status,
  271. "retrieval_method": retrieval_method,
  272. "retrieval_metrics": {
  273. "recall_count": 0,
  274. "retrieval_method": retrieval_method,
  275. "scope": {key: value for key, value in (retrieval_scope or {}).items() if value},
  276. },
  277. "warnings": warnings,
  278. }
  279. def _warning(self, key: str) -> str:
  280. warnings = self.config.warnings or _default_warnings()
  281. return warnings.get(key) or _default_warnings().get(key) or ""
  282. def _default_warnings() -> Dict[str, str]:
  283. return {
  284. "no_scope": "缺少可靠的知识库检索范围,本次未引用向量库内容。",
  285. "no_recall": "未召回可信知识库内容,本次回答不引用向量库。",
  286. "low_confidence": "未找到可信度足够的知识库片段,本次未引用向量库内容。",
  287. "rerank_failed": "知识库片段重排不可用,本次未引用向量库内容。",
  288. }
  289. def _escape_milvus_string(value: str) -> str:
  290. return str(value).replace("\\", "\\\\").replace("'", "\\'")
  291. def _to_int(value: Any, default: int) -> int:
  292. try:
  293. return int(value)
  294. except (TypeError, ValueError):
  295. return default
  296. def _to_float(value: Any, default: float) -> float:
  297. try:
  298. return float(value)
  299. except (TypeError, ValueError):
  300. return default