| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- import asyncio
- from foundation.observability.monitoring.time_statistics import track_execution_time
- from foundation.ai.rag.retrieval.retrieval import retrieval_manager
- from foundation.observability.logger.loggering import review_logger as server_logger
- from foundation.infrastructure.config.config import config_handler
- class ReviewPointRetriever():
- """
- 审查要点检索器 — 直接从施工方案文本搜索规范条文
- 替代旧的 EntitiesEnhance(实体增强检索),新流程:
- 1. 对每个 search_query → hybrid_search(CHILDREN_COLLECTION) → 一次重排序(rerank)
- 2. 合并所有 search_query 的候选结果并去重
- 3. 用 original_text 做二次重排序(语义锚点对齐)
- 4. 返回 top-K 结果
- 核心改进:
- - 跳过 ENTITY_COLLECTION 中间跳,直接搜索规范条文集合
- - 用原文摘录(而非 LLM 概括的 background)做二次重排序,语义更精确
- """
- def __init__(self):
- self.result_lists = []
- self._search_cache = {} # 检索结果缓存
- def _get_cache_key(self, query: str) -> str:
- return f"search::{query[:100]}"
- def _get_children_collection(self) -> str:
- return config_handler.get('rag_collections', 'CHILDREN_COLLECTION', 'rag_children_hybrid')
- @track_execution_time
- def review_point_retrieval(self, review_points):
- """
- 审查要点检索 — 替代 entities_enhance_retrieval
- Args:
- review_points: 审查要点列表, 每个要点包含:
- - label: 标签
- - search_queries: 规范检索语句列表
- - original_text: 原文摘录
- - parameter: 技术参数
- Returns:
- list[list[dict]]: 二维列表, 每个子列表对应一个审查要点的检索结果
- """
- def run_async(coro):
- """在合适的环境中运行异步函数"""
- try:
- loop = asyncio.get_running_loop()
- import concurrent.futures
- with concurrent.futures.ThreadPoolExecutor() as executor:
- future = executor.submit(asyncio.run, coro)
- return future.result()
- except RuntimeError:
- return asyncio.run(coro)
- self.result_lists = []
- children_collection = self._get_children_collection()
- for point_idx, point in enumerate(review_points):
- # 兼容新旧字段名
- label = point.get('label', point.get('entity', ''))
- search_queries = point.get('search_queries', point.get('search_keywords', []))
- original_text = point.get('original_text', point.get('background', ''))
- server_logger.info(
- f"正在处理审查要点 [{point_idx}]: {label}, "
- f"检索语句数: {len(search_queries)}, "
- f"原文长度: {len(original_text)}"
- )
- # Step 1: 对每个 search_query 执行 hybrid_search + 一次重排序
- all_candidates = []
- for query in search_queries:
- cache_key = self._get_cache_key(query)
- if cache_key in self._search_cache:
- query_results = self._search_cache[cache_key]
- server_logger.info(f"[缓存命中] search_query: {query[:30]}...")
- else:
- query_results = run_async(
- retrieval_manager.async_multi_stage_recall(
- collection_name=children_collection,
- query_text=query,
- hybrid_top_k=10,
- top_k=5
- )
- )
- self._search_cache[cache_key] = query_results
- server_logger.info(
- f"[检索完成] search_query: {query[:30]}... "
- f"召回 {len(query_results)} 个候选"
- )
- all_candidates.extend(query_results)
- if not all_candidates:
- server_logger.warning(f"审查要点 '{label}' 所有检索语句均无结果")
- self.result_lists.append([])
- continue
- # Step 2: 去重 (基于 text_content)
- seen_texts = set()
- unique_candidates = []
- for item in all_candidates:
- text = item.get('text_content', '')
- if text and text not in seen_texts:
- seen_texts.add(text)
- unique_candidates.append(item)
- server_logger.info(
- f"审查要点 '{label}': 合并 {len(all_candidates)} 个候选, "
- f"去重后 {len(unique_candidates)} 个"
- )
- # Step 3: 筛选高分候选 (rerank_score > 0.5)
- high_score = [c for c in unique_candidates if (c.get('rerank_score') or 0) > 0.5]
- if not high_score:
- # 无高分候选 → 该审查要点无相关规范,直接跳过(不进入后续流程)
- max_score = max((c.get('rerank_score') or 0) for c in unique_candidates) if unique_candidates else 0
- server_logger.warning(
- f"审查要点 '{label}': 无高分候选(>0.5), 共 {len(unique_candidates)} 个候选均低于阈值, "
- f"最高分={max_score:.4f}, 跳过该审查要点"
- )
- self.result_lists.append([])
- continue
- # Step 4: 二次重排序 — 用 original_text 作为语义锚点
- if original_text and len(original_text) > 10 and len(high_score) > 1:
- final_results = self._secondary_rerank(original_text, high_score, top_k=5)
- server_logger.info(
- f"审查要点 '{label}': 二次重排序完成, "
- f"返回 {len(final_results)} 个结果"
- )
- else:
- final_results = high_score[:5]
- server_logger.info(
- f"审查要点 '{label}': 跳过二次重排序 "
- f"(原文长度={len(original_text)}, 候选数={len(high_score)})"
- )
- # Step 5: 标记来源信息 (backward compat)
- for result in final_results:
- result['source_entity'] = label
- self.result_lists.append(final_results)
- return self.result_lists
- def _secondary_rerank(self, original_text, candidates, top_k=5):
- """
- 二次重排序: 用 original_text(原文摘录)作为 query,对候选文档重排序
- 核心创新: 用施工原文(而非 entity description 或 LLM 概括的 background)做 rerank,
- 确保检索到的规范条文与施工文本的语义精确对齐
- """
- # 提取候选文本(去重)
- candidate_texts = []
- seen = set()
- for item in candidates:
- text = item.get('text_content', '')
- if text and text not in seen:
- seen.add(text)
- candidate_texts.append(text)
- if not candidate_texts:
- return candidates[:top_k]
- try:
- rerank_results = retrieval_manager._get_rerank_results(
- original_text, candidate_texts, top_k
- )
- except Exception as e:
- server_logger.error(f"二次重排序失败: {e}")
- return candidates[:top_k]
- # 将 rerank 分数映射回原始结果
- text_to_items = {}
- for item in candidates:
- text = item.get('text_content', '')
- if text not in text_to_items:
- text_to_items[text] = []
- text_to_items[text].append(item)
- final_results = []
- added_texts = set()
- for rerank_item in rerank_results:
- text = rerank_item.get('text', '')
- score = rerank_item.get('score', 0.0)
- if text in text_to_items and text not in added_texts:
- best_candidate = max(
- text_to_items[text],
- key=lambda x: x.get('rerank_score', 0.0)
- )
- result_item = best_candidate.copy()
- result_item['bfp_rerank_score'] = score # 二次重排序分数 (backward compat)
- result_item['bfp_rerank_parent_id'] = result_item.get(
- 'metadata', {}
- ).get('parent_id', '')
- final_results.append(result_item)
- added_texts.add(text)
- return final_results
- def clear_cache(self):
- """清空检索缓存"""
- self._search_cache.clear()
- server_logger.info("[缓存清理] 审查要点检索缓存已清空")
- # 向后兼容:旧代码调用 entities_enhance_retrieval 时自动转发
- def entities_enhance_retrieval(self, query_pairs):
- """向后兼容入口,转发到 review_point_retrieval"""
- return self.review_point_retrieval(query_pairs)
- # 全局实例 — 新名称
- review_point_retriever = ReviewPointRetriever()
- # 向后兼容:旧代码 import entity_enhance 时不会报错
- entity_enhance = review_point_retriever
|