# -*- coding: utf-8 -*- """质量优先的多路向量检索服务。 四路召回架构: 1. parent_vector:父表向量检索(主体内容向量) 2. child_locator:子表向量定位 → 反查父行(精确定位片段) 3. tag_keyword:标签关键词匹配(设备型号、标准号等) 4. chapter_similarity:章节相似度检索(同类型章节参考) 合并策略: - RRF(Reciprocal Rank Fusion)融合多路排名 - 按路径加权:parent_vector 1.0, child_locator 0.8, tag 1.2, chapter 0.5 - 多源加分:同一条候选在多个路径中被召回时额外加分 - 标签匹配加分:关键词出现在 tag_list 或文本中时额外加分 - Scope 匹配加分:与当前项目/章节范围一致时额外加分 去重策略: - candidate_key 去重(基于 document_id + parent_id + chunk_id) - 内容哈希去重(同一文件同一文本内容仅保留一条) """ from __future__ import annotations from typing import Any, Callable, Dict, List, Optional from core.document_chat.component.document_chat_logger import document_chat_logger as logger from core.document_chat.retrieval.candidate import ( build_candidate_key, clean_candidates, merge_metadata, metadata_value, normalize_metadata, normalize_row_metadata, ) from core.document_chat.retrieval.config import RetrievalConfig, default_warnings, load_retrieval_config from core.document_chat.retrieval.fusion import calc_tag_bonus, merge_recall_results from core.document_chat.retrieval.query_builder import ( build_query as build_retrieval_query, build_query_keywords as build_retrieval_query_keywords, ) from core.document_chat.retrieval.scope import ( build_filter_expr, build_tag_expr, extract_scope, has_reliable_scope, metadata_matches_scope, select_tag_terms, ) from core.document_chat.retrieval.utils import ( combine_expr as _combine_expr, escape_milvus_string as _escape_milvus_string, pack_log_items as _pack_log_items, to_float as _to_float, ) class DocumentChatRetrievalService: """构建检索查询,从向量库召回高质量候选。 核心流程: 1. build_query:将用户输入、章节信息、意图拼接为检索 query 2. recall:执行多路召回 → RRF 合并 → 去重 """ # 父表查询输出字段 PARENT_OUTPUT_FIELDS = [ "pk", "text", "document_id", "parent_id", "index", "tag_list", "metadata", "file_name", "chapter_title", "chapter_level_1", "chapter_level_2", "chapter_level_3", ] # 子表查询输出字段 CHILD_OUTPUT_FIELDS = [ "pk", "text", "document_id", "parent_id", "index", "tag_list", "metadata", "file_name", "chapter_title", "chapter_level_1", "chapter_level_2", "chapter_level_3", ] def __init__(self, config: Optional[RetrievalConfig] = None): self.config = config or load_retrieval_config() # ============================================================ # Query 构建 # ============================================================ def build_query(self, state: Dict[str, Any]) -> str: """构建精炼检索 query,避免冗余的项目摘要。""" return build_retrieval_query( state, domain_terms=self.config.keyword_domain_terms, action_terms=self.config.keyword_action_terms, ) def build_query_keywords(self, state: Dict[str, Any], query: Optional[str] = None) -> List[str]: """从多来源提取检索关键词。""" return build_retrieval_query_keywords( state, query, domain_terms=self.config.keyword_domain_terms, action_terms=self.config.keyword_action_terms, ) # ============================================================ # 主召回入口 # ============================================================ def recall(self, state: Dict[str, Any]) -> Dict[str, Any]: """执行多路向量召回,RRF 合并,去重过滤。 返回: - retrieval_candidates:去重后的候选列表 - retrieval_status:recalled / no_scope / no_recall / disabled - retrieval_metrics:各路径召回统计 - retrieval_steps:每步详细日志 """ if not self.config.enabled: return self._empty_result("disabled", [], retrieval_method="disabled") query = str(state.get("retrieval_query") or "").strip() if not query: return self._empty_result("no_recall", [self._warning("no_recall")], retrieval_method="empty_query") # 提取检索范围(项目ID、工程类型、章节分类等) scope = self._extract_scope(state) if not self._has_reliable_scope(scope) and not self.config.allow_unscoped_search: return self._empty_result( "no_scope", [self._warning("no_scope")], retrieval_method="no_scope", retrieval_scope=scope, ) keywords = list(state.get("retrieval_keywords") or self.build_query_keywords(state, query)) retrieval_steps: List[Dict[str, Any]] = [] source_results: Dict[str, List[Dict[str, Any]]] = {} # ===== 四路召回 ===== source_results["parent_vector"] = self._run_recall_path( "parent_vector", lambda: self._recall_by_parent_vector(scope, query), retrieval_steps, query=query, scope=scope, ) source_results["child_locator"] = self._run_recall_path( "child_locator", lambda: self._recall_by_child_locator(scope, query), retrieval_steps, query=query, scope=scope, ) if self.config.tag_recall_enabled: source_results["tag"] = self._run_recall_path( "tag", lambda: self._recall_by_tag(scope, keywords), retrieval_steps, query=" ".join(keywords[: self.config.tag_terms_limit]), scope=scope, ) if scope.get("chapter_level_1") and scope.get("chapter_level_2"): source_results["chapter_similarity"] = self._run_recall_path( "chapter_similarity", lambda: self._recall_by_chapter(scope, query), retrieval_steps, query=query, scope=scope, ) # ===== RRF 合并 + 去重 ===== merged_candidates = self._merge_recall_results(source_results, scope, keywords) cleaned = self._clean_candidates(merged_candidates) retrieval_steps.append( { "step": "rrf_merge", "query": query, "scope": {key: value for key, value in scope.items() if value}, "count": len(merged_candidates), "items": _pack_log_items(merged_candidates), } ) retrieval_steps.append( { "step": "clean_candidates", "count": len(cleaned), "items": _pack_log_items(cleaned), } ) if not cleaned: return self._empty_result( "no_recall", [self._warning("no_recall")], retrieval_method="multi_path_rrf", retrieval_scope=scope, retrieval_steps=retrieval_steps, ) source_counts = {source: len(items or []) for source, items in source_results.items()} # 日志:区分请求的 scope、实际应用的过滤、实际召回的文件 applied_expr = self._build_filter_expr(scope) actual_files = list(dict.fromkeys( str(item.get("source", ""))[:40] for item in cleaned if item.get("source") ))[:5] logger.info( f"[DocumentChat] recall completed: method=multi_path_rrf " f"requested_scope={dict((k, v) for k, v in scope.items() if v)} " f"applied_filter='{applied_expr}' " f"actual_sources={actual_files} " f"source_counts={source_counts} " f"total={len(cleaned)} max_sim={max((item.get('vector_similarity', 0.0) for item in cleaned), default=0.0):.4f}" ) metrics = { "recall_count": len(cleaned), "merged_count": len(merged_candidates), "source_counts": source_counts, "max_vector_similarity": max((item.get("vector_similarity", 0.0) for item in cleaned), default=0.0), "max_fusion_score": max((item.get("fusion_score", 0.0) for item in cleaned), default=0.0), "scope": {key: value for key, value in scope.items() if value}, "retrieval_method": "multi_path_rrf", } return { "retrieval_candidates": cleaned, "retrieval_steps": retrieval_steps, "retrieval_status": "recalled", "retrieval_method": "multi_path_rrf", "retrieval_metrics": metrics, "warnings": [], } def _run_recall_path( self, step: str, func: Callable[[], List[Dict[str, Any]]], retrieval_steps: List[Dict[str, Any]], query: str, scope: Dict[str, Any], ) -> List[Dict[str, Any]]: """执行单路召回,异常时不阻断其他路径。""" try: candidates = func() or [] retrieval_steps.append( { "step": step, "query": query, "scope": {key: value for key, value in scope.items() if value}, "count": len(candidates), "items": _pack_log_items(candidates), } ) return candidates except Exception as exc: logger.warning(f"[DocumentChat] {step} recall failed: {exc}", exc_info=True) retrieval_steps.append( { "step": step, "query": query, "scope": {key: value for key, value in scope.items() if value}, "count": 0, "error": str(exc), "items": [], } ) return [] # ============================================================ # 四路召回具体实现 # ============================================================ def _recall_by_parent_vector(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]: """父表向量检索:Milvus 混合搜索(dense + sparse),直接返回父表文档。""" from foundation.database.base.vector.milvus_vector import MilvusVectorManager expr = self._build_filter_expr(scope) results = MilvusVectorManager().hybrid_search( param={"collection_name": self.config.parent_collection, "expr": expr}, query_text=query, top_k=self.config.parent_recall_top_k, ranker_type=self.config.ranker_type, dense_weight=self.config.dense_weight, sparse_weight=self.config.sparse_weight, ) return [ self._candidate_from_vector_row(row, "parent_vector", scope) for row in results if str(row.get("text_content") or "").strip() ] def _recall_by_child_locator(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]: """子表向量定位 + 父表反查:先用 query 在子表中找到匹配片段, 再通过 parent_id 反查父表行,获取完整的父文档内容。 优势:子表粒度更细,能精确定位到段落级别,然后拉取对应父文档的完整内容。 """ from foundation.database.base.vector.milvus_vector import MilvusVectorManager expr = self._build_filter_expr(scope) child_rows = MilvusVectorManager().hybrid_search( param={"collection_name": self.config.child_collection, "expr": expr}, query_text=query, top_k=self.config.child_recall_top_k, ranker_type=self.config.ranker_type, dense_weight=self.config.child_dense_weight, sparse_weight=self.config.child_sparse_weight, ) # 按 parent_id 分组子表命中结果 child_groups: Dict[str, List[Dict[str, Any]]] = {} for row in child_rows: metadata = self._normalize_row_metadata(row.get("metadata") or {}) parent_id = str(self._metadata_value(metadata, "parent_id") or "").strip() if not parent_id: continue child_groups.setdefault(parent_id, []).append(row) # 通过 parent_id 反查父表 parent_rows = self._fetch_parent_rows_by_parent_ids(list(child_groups.keys()), scope) candidates = [] for parent_row in parent_rows: parent_id = str(parent_row.get("parent_id") or "").strip() matches = child_groups.get(parent_id) or [] max_similarity = max((_to_float(item.get("similarity"), 0.0) for item in matches), default=0.0) candidate = self._candidate_from_parent_row(parent_row, "child_locator", scope, max_similarity) metadata = candidate.setdefault("metadata", {}) metadata["child_hit_count"] = len(matches) # 子表命中次数 metadata["matched_child_texts"] = [ str(item.get("text_content") or "").strip() for item in matches[:5] if str(item.get("text_content") or "").strip() ] candidates.append(candidate) return candidates def _recall_by_tag(self, scope: Dict[str, Any], keywords: List[str]) -> List[Dict[str, Any]]: """标签关键词召回:从关键词中筛选标准号、设备名等专业术语, 在 tag_list 字段上做 LIKE 匹配。 注意:标签召回容易过度匹配,因此结果相似度乘以 0.7 打折。 """ tag_terms = self._select_tag_terms(keywords) if not tag_terms: return [] tag_expr = self._build_tag_expr(tag_terms) scope_expr = self._build_filter_expr(scope) expr = _combine_expr(scope_expr, tag_expr) # 父表标签匹配 parent_rows = self._condition_query( collection_name=self.config.parent_collection, filter_expr=expr, output_fields=self.PARENT_OUTPUT_FIELDS, limit=self.config.tag_recall_top_k, ) candidates = [ self._candidate_from_parent_row(row, "tag", scope, self.config.min_vector_similarity) for row in parent_rows ] # 子表标签匹配,再反查父行 child_rows = self._condition_query( collection_name=self.config.child_collection, filter_expr=expr, output_fields=self.CHILD_OUTPUT_FIELDS, limit=self.config.tag_recall_top_k, ) child_parent_ids = [] child_tag_map: Dict[str, List[str]] = {} for row in child_rows: parent_id = str(row.get("parent_id") or self._metadata_value(row, "parent_id") or "").strip() if not parent_id: continue child_parent_ids.append(parent_id) text = str(row.get("text") or "").strip() if text: child_tag_map.setdefault(parent_id, []).append(text) for row in self._fetch_parent_rows_by_parent_ids(child_parent_ids, scope): parent_id = str(row.get("parent_id") or "").strip() candidate = self._candidate_from_parent_row(row, "tag", scope, self.config.min_vector_similarity) metadata = candidate.setdefault("metadata", {}) metadata["matched_child_texts"] = child_tag_map.get(parent_id, [])[:5] candidates.append(candidate) # 标签结果打折,防止过度匹配 for candidate in candidates: candidate["vector_similarity"] *= 0.7 # 记录匹配的标签术语 for candidate in candidates: metadata = candidate.setdefault("metadata", {}) tag_text = " ".join( str(value or "") for value in ( metadata.get("tag_list"), candidate.get("text"), " ".join(metadata.get("matched_child_texts") or []), ) ) metadata["tag_match_terms"] = [term for term in tag_terms if term and term in tag_text] return candidates def _recall_by_chapter(self, scope: Dict[str, Any], query: str) -> List[Dict[str, Any]]: """章节相似度检索:调用现有 similar_fragment_service, 按 chapter_level_1 + chapter_level_2 限定范围搜索相似片段。 """ from core.construction_write.component.similar_fragment_service import search_similar_fragments rows = search_similar_fragments( level1=str(scope.get("chapter_level_1") or ""), level2=str(scope.get("chapter_level_2") or ""), search_text=query, top_k=self.config.chapter_recall_top_k, ) candidates = [] for row in rows: metadata = { "tenant_id": scope.get("tenant_id") or "", "project_id": scope.get("project_id") or "", "knowledge_base_id": scope.get("knowledge_base_id") or "", "file_name": row.get("file_name") or "", "chapter_level_1": row.get("chapter_level_1") or scope.get("chapter_level_1") or "", "chapter_level_2": row.get("chapter_level_2") or scope.get("chapter_level_2") or "", "parent_count": row.get("parent_count", 0), "source_scope_valid": True, # 通过章节分类限定,天然 scope 匹配 } text = str(row.get("text") or "").strip() candidates.append( { "candidate_key": self._build_candidate_key({**row, "metadata": metadata}, text), "text": text, "source": metadata.get("file_name") or "向量知识库", "vector_similarity": _to_float(row.get("similarity"), 0.0), "fusion_score": 0.0, "metadata": metadata, "source_hits": {}, "retrieval_source": "chapter_similarity", } ) return candidates # ============================================================ # RRF 合并 # ============================================================ def _merge_recall_results( self, source_results: Dict[str, List[Dict[str, Any]]], scope: Dict[str, Any], keywords: List[str], ) -> List[Dict[str, Any]]: """多路召回结果 RRF 融合合并。""" return merge_recall_results(source_results, scope, keywords, self.config) # ============================================================ # Milvus 查询辅助 # ============================================================ def _fetch_parent_rows_by_parent_ids(self, parent_ids: List[str], scope: Dict[str, Any]) -> List[Dict[str, Any]]: """根据 parent_id 列表反查父表行,去重后逐条查询。""" unique_ids = [] seen = set() for parent_id in parent_ids: value = str(parent_id or "").strip() if value and value not in seen: seen.add(value) unique_ids.append(value) rows: List[Dict[str, Any]] = [] scope_expr = self._build_filter_expr(scope) for parent_id in unique_ids[: self.config.recall_top_k]: parent_expr = f"parent_id == '{_escape_milvus_string(parent_id)}'" expr = _combine_expr(parent_expr, scope_expr) rows.extend( self._condition_query( collection_name=self.config.parent_collection, filter_expr=expr, output_fields=self.PARENT_OUTPUT_FIELDS, limit=100, ) ) return rows def _condition_query( self, collection_name: str, filter_expr: str, output_fields: List[str], limit: int, ) -> List[Dict[str, Any]]: """Milvus 条件查询(非向量),按 filter 表达式筛选文档。""" from core.construction_write.component.similar_fragment_service import get_milvus_manager if not filter_expr: return [] return get_milvus_manager().condition_query( collection_name=collection_name, filter=filter_expr, output_fields=output_fields, limit=limit, ) # ============================================================ # 候选构建 # ============================================================ def _candidate_from_vector_row(self, row: Dict[str, Any], source: str, scope: Dict[str, Any]) -> Dict[str, Any]: """从 Milvus 混合搜索结果行构建标准候选。""" metadata = self._normalize_row_metadata(row.get("metadata") or {}) text = str(row.get("text_content") or row.get("text") or "").strip() metadata["source_scope_valid"] = self._metadata_matches_scope(metadata, scope) return { "candidate_key": self._build_candidate_key(metadata, text), "text": text, "source": metadata.get("file_name") or metadata.get("title") or "向量知识库", "vector_similarity": _to_float(row.get("similarity"), 0.0), "fusion_score": 0.0, "metadata": metadata, "source_hits": {}, "retrieval_source": source, } def _candidate_from_parent_row( self, row: Dict[str, Any], source: str, scope: Dict[str, Any], vector_similarity: float, ) -> Dict[str, Any]: """从父表行构建标准候选。""" metadata = self._normalize_row_metadata(row) text = str(row.get("text") or "").strip() metadata["source_scope_valid"] = self._metadata_matches_scope(metadata, scope) return { "candidate_key": self._build_candidate_key(metadata, text), "text": text, "source": metadata.get("file_name") or "向量知识库", "vector_similarity": _to_float(vector_similarity, 0.0), "fusion_score": 0.0, "metadata": metadata, "source_hits": {}, "retrieval_source": source, } # ============================================================ # Scope 提取与过滤 # ============================================================ def _extract_scope(self, state: Dict[str, Any]) -> Dict[str, Any]: """从工作流状态中提取检索范围信息。""" return extract_scope(state) @staticmethod def _has_reliable_scope(scope: Dict[str, Any]) -> bool: """判断是否有足够可靠的 scope 用于限定检索范围。""" return has_reliable_scope(scope) def _build_filter_expr(self, scope: Dict[str, Any]) -> str: """构建 Milvus 过滤表达式,按章节层级限定检索范围。""" return build_filter_expr(scope) def _build_tag_expr(self, tag_terms: List[str]) -> str: """构建标签 LIKE 查询表达式。""" return build_tag_expr(tag_terms, self.config.tag_terms_limit) def _select_tag_terms(self, keywords: List[str]) -> List[str]: """从关键词中筛选高价值标签术语。""" return select_tag_terms( keywords, self.config.tag_terms_limit, generic_terms=self.config.tag_generic_terms, priority_terms=self.config.tag_priority_terms, ) @staticmethod def _metadata_matches_scope(metadata: Dict[str, Any], scope: Dict[str, Any]) -> bool: """检查候选 metadata 是否与当前检索 scope 兼容。""" return metadata_matches_scope(metadata, scope) # ============================================================ # Metadata 处理 # ============================================================ def _normalize_row_metadata(self, row_or_metadata: Any) -> Dict[str, Any]: """规范化行数据为统一的 metadata 字典。处理嵌套 metadata 和 YAML 字符串。""" return normalize_row_metadata(row_or_metadata, self.PARENT_OUTPUT_FIELDS) @staticmethod def _normalize_metadata(metadata: Any) -> Dict[str, Any]: """将 metadata 转为字典,支持 YAML 字符串解析。""" return normalize_metadata(metadata) @staticmethod def _metadata_value(metadata: Dict[str, Any], key: str) -> Any: """安全获取 metadata 值,支持嵌套 metadata.metadata 和 YAML 字符串。""" return metadata_value(metadata, key) def _build_candidate_key(self, metadata: Dict[str, Any], text: Any = "") -> str: """构建候选唯一标识键,按优先级尝试不同字段组合。""" return build_candidate_key(metadata, text, self.PARENT_OUTPUT_FIELDS) def _merge_metadata(self, current: Dict[str, Any], incoming: Dict[str, Any]) -> None: """合并两条候选的 metadata,不覆盖已有非空值。""" merge_metadata(current, incoming) # ============================================================ # 加分计算 # ============================================================ def _calc_tag_bonus(self, candidate: Dict[str, Any], keywords: List[str]) -> float: """计算标签匹配加分:关键词精确匹配 tag_list 加分更多,仅出现在文本中加分较少。""" return calc_tag_bonus(candidate, keywords, self.config) # ============================================================ # 候选清理 # ============================================================ def _clean_candidates(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """清理候选:过滤过短文本、双重去重(candidate_key + 内容哈希)。 去重策略: 1. candidate_key 去重:相同 document+parent+chunk 视为同一条 2. 内容哈希去重:同一文件同一文本内容(即使路径不同)只保留一条 """ return clean_candidates(candidates, self.config) # ============================================================ # 空结果/告警 # ============================================================ def _empty_result( self, status: str, warnings: List[str], retrieval_method: str = "", retrieval_scope: Optional[Dict[str, Any]] = None, retrieval_steps: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """构建空召回结果。""" return { "retrieval_candidates": [], "retrieval_steps": retrieval_steps or [], "retrieval_status": status, "retrieval_method": retrieval_method, "retrieval_metrics": { "recall_count": 0, "retrieval_method": retrieval_method, "scope": {key: value for key, value in (retrieval_scope or {}).items() if value}, }, "warnings": warnings, } def _warning(self, key: str) -> str: """获取指定键的告警文案。""" warnings = self.config.warnings or default_warnings() return warnings.get(key) or ""