import asyncio from foundation.observability.monitoring.time_statistics import track_execution_time from foundation.ai.rag.retrieval.retrieval import retrieval_manager from foundation.observability.logger.loggering import review_logger as server_logger from foundation.infrastructure.config.config import config_handler class ReviewPointRetriever(): """ 审查要点检索器 — 直接从施工方案文本搜索规范条文 替代旧的 EntitiesEnhance(实体增强检索),新流程: 1. 对每个 search_query → hybrid_search(CHILDREN_COLLECTION) → 一次重排序(rerank) 2. 合并所有 search_query 的候选结果并去重 3. 用 original_text 做二次重排序(语义锚点对齐) 4. 返回 top-K 结果 核心改进: - 跳过 ENTITY_COLLECTION 中间跳,直接搜索规范条文集合 - 用原文摘录(而非 LLM 概括的 background)做二次重排序,语义更精确 """ def __init__(self): self.result_lists = [] self._search_cache = {} # 检索结果缓存 def _get_cache_key(self, query: str) -> str: return f"search::{query[:100]}" def _get_children_collection(self) -> str: return config_handler.get('rag_collections', 'CHILDREN_COLLECTION', 'rag_children_hybrid') @track_execution_time def review_point_retrieval(self, review_points): """ 审查要点检索 — 替代 entities_enhance_retrieval Args: review_points: 审查要点列表, 每个要点包含: - label: 标签 - search_queries: 规范检索语句列表 - original_text: 原文摘录 - parameter: 技术参数 Returns: list[list[dict]]: 二维列表, 每个子列表对应一个审查要点的检索结果 """ def run_async(coro): """在合适的环境中运行异步函数""" try: loop = asyncio.get_running_loop() import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, coro) return future.result() except RuntimeError: return asyncio.run(coro) self.result_lists = [] children_collection = self._get_children_collection() for point_idx, point in enumerate(review_points): # 兼容新旧字段名 label = point.get('label', point.get('entity', '')) search_queries = point.get('search_queries', point.get('search_keywords', [])) original_text = point.get('original_text', point.get('background', '')) server_logger.info( f"正在处理审查要点 [{point_idx}]: {label}, " f"检索语句数: {len(search_queries)}, " f"原文长度: {len(original_text)}" ) # Step 1: 对每个 search_query 执行 hybrid_search + 一次重排序 all_candidates = [] for query in search_queries: cache_key = self._get_cache_key(query) if cache_key in self._search_cache: query_results = self._search_cache[cache_key] server_logger.info(f"[缓存命中] search_query: {query[:30]}...") else: query_results = run_async( retrieval_manager.async_multi_stage_recall( collection_name=children_collection, query_text=query, hybrid_top_k=10, top_k=5 ) ) self._search_cache[cache_key] = query_results server_logger.info( f"[检索完成] search_query: {query[:30]}... " f"召回 {len(query_results)} 个候选" ) all_candidates.extend(query_results) if not all_candidates: server_logger.warning(f"审查要点 '{label}' 所有检索语句均无结果") self.result_lists.append([]) continue # Step 2: 去重 (基于 text_content) seen_texts = set() unique_candidates = [] for item in all_candidates: text = item.get('text_content', '') if text and text not in seen_texts: seen_texts.add(text) unique_candidates.append(item) server_logger.info( f"审查要点 '{label}': 合并 {len(all_candidates)} 个候选, " f"去重后 {len(unique_candidates)} 个" ) # Step 3: 筛选高分候选 (rerank_score > 0.5) high_score = [c for c in unique_candidates if (c.get('rerank_score') or 0) > 0.5] if not high_score: # 无高分候选 → 该审查要点无相关规范,直接跳过(不进入后续流程) max_score = max((c.get('rerank_score') or 0) for c in unique_candidates) if unique_candidates else 0 server_logger.warning( f"审查要点 '{label}': 无高分候选(>0.5), 共 {len(unique_candidates)} 个候选均低于阈值, " f"最高分={max_score:.4f}, 跳过该审查要点" ) self.result_lists.append([]) continue # Step 4: 二次重排序 — 用 original_text 作为语义锚点 if original_text and len(original_text) > 10 and len(high_score) > 1: final_results = self._secondary_rerank(original_text, high_score, top_k=5) server_logger.info( f"审查要点 '{label}': 二次重排序完成, " f"返回 {len(final_results)} 个结果" ) else: final_results = high_score[:5] server_logger.info( f"审查要点 '{label}': 跳过二次重排序 " f"(原文长度={len(original_text)}, 候选数={len(high_score)})" ) # Step 5: 标记来源信息 (backward compat) for result in final_results: result['source_entity'] = label self.result_lists.append(final_results) return self.result_lists def _secondary_rerank(self, original_text, candidates, top_k=5): """ 二次重排序: 用 original_text(原文摘录)作为 query,对候选文档重排序 核心创新: 用施工原文(而非 entity description 或 LLM 概括的 background)做 rerank, 确保检索到的规范条文与施工文本的语义精确对齐 """ # 提取候选文本(去重) candidate_texts = [] seen = set() for item in candidates: text = item.get('text_content', '') if text and text not in seen: seen.add(text) candidate_texts.append(text) if not candidate_texts: return candidates[:top_k] try: rerank_results = retrieval_manager._get_rerank_results( original_text, candidate_texts, top_k ) except Exception as e: server_logger.error(f"二次重排序失败: {e}") return candidates[:top_k] # 将 rerank 分数映射回原始结果 text_to_items = {} for item in candidates: text = item.get('text_content', '') if text not in text_to_items: text_to_items[text] = [] text_to_items[text].append(item) final_results = [] added_texts = set() for rerank_item in rerank_results: text = rerank_item.get('text', '') score = rerank_item.get('score', 0.0) if text in text_to_items and text not in added_texts: best_candidate = max( text_to_items[text], key=lambda x: x.get('rerank_score', 0.0) ) result_item = best_candidate.copy() result_item['bfp_rerank_score'] = score # 二次重排序分数 (backward compat) result_item['bfp_rerank_parent_id'] = result_item.get( 'metadata', {} ).get('parent_id', '') final_results.append(result_item) added_texts.add(text) return final_results def clear_cache(self): """清空检索缓存""" self._search_cache.clear() server_logger.info("[缓存清理] 审查要点检索缓存已清空") # 向后兼容:旧代码调用 entities_enhance_retrieval 时自动转发 def entities_enhance_retrieval(self, query_pairs): """向后兼容入口,转发到 review_point_retrieval""" return self.review_point_retrieval(query_pairs) # 全局实例 — 新名称 review_point_retriever = ReviewPointRetriever() # 向后兼容:旧代码 import entity_enhance 时不会报错 entity_enhance = review_point_retriever