import asyncio import json from typing import List, Dict, Any, Optional from foundation.ai.models.rerank_model import rerank_model from foundation.observability.monitoring.time_statistics import track_execution_time from foundation.infrastructure.config.config import config_handler from foundation.observability.logger.loggering import server_logger from foundation.database.base.vector.milvus_vector import MilvusVectorManager class RetrievalManager: """ 召回管理器,实现多路召回功能 """ def __init__(self): """ 初始化召回管理器 """ self.vector_manager = MilvusVectorManager() self.logger = server_logger self.dense_weight = config_handler.get('hybrid_search', 'DENSE_WEIGHT', 0.7) self.sparse_weight = config_handler.get('hybrid_search', 'SPARSE_WEIGHT', 0.3) # 重排序模型配置(从 [model] 部分统一管理) self.rerank_model_type = config_handler.get('model', 'RERANK_MODEL_TYPE', 'bge_rerank_model') self.logger.info(f"初始化重排序模型类型: {self.rerank_model_type}") def set_rerank_model(self, model_type: str): """ 设置重排序模型类型 Args: model_type: 配置section名称 ('bge_rerank_model', 'lq_rerank_model', 'silicoflow_rerank_model') """ valid_models = ['bge_rerank_model', 'lq_rerank_model', 'silicoflow_rerank_model'] if model_type not in valid_models: raise ValueError(f"model_type 必须是 {valid_models}") self.rerank_model_type = model_type self.logger.info(f"重排序模型类型已设置为: {model_type}") def _clean_document(self, doc: str) -> str: """ 清理文档文本,移除HTML标签和特殊字符 Args: doc: 原始文档文本 Returns: str: 清理后的文档文本 """ if not isinstance(doc, str): self.logger.debug(f"文档类型转换: {type(doc)} -> str") return str(doc) original_length = len(doc) # 移除HTML标签 import re doc = re.sub(r'<[^>]+>', '', doc) # 移除多余的空白字符 doc = re.sub(r'\s+', ' ', doc) # 更宽松的字符过滤 - 保留更多字符 doc = re.sub(r'[^\u4e00-\u9fff\w\s.,;:!?()()。,;:!?\-\+\=\*/%&@#¥$【】「」""''""\n\r]', '', doc) # 截断过长的文本 if len(doc) > 8000: # 设置最大长度限制 doc = doc[:8000] + "..." cleaned_doc = doc.strip() self.logger.debug(f"文档清理: {original_length} -> {len(cleaned_doc)} 字符") return cleaned_doc def _get_rerank_results(self, query_text: str, documents: List[str], top_k: int = None) -> List[Dict[str, Any]]: """ 根据配置选择重排序模型并执行重排序 Args: query_text: 查询文本 documents: 文档列表 top_k: 返回结果数量 Returns: List[Dict]: 重排序后的结果列表 """ try: # 清理和验证文档列表 cleaned_documents = [] valid_original_docs = [] for doc in documents: if doc and isinstance(doc, str) and doc.strip(): cleaned_doc = self._clean_document(doc) if cleaned_doc and len(cleaned_doc) > 3: cleaned_documents.append(cleaned_doc) valid_original_docs.append(doc) if not cleaned_documents: return [] # 根据配置section名称路由到对应的reranker方法 if self.rerank_model_type == 'lq_rerank_model': self.logger.info("使用本地 Qwen3-Reranker-8B (lq_rerank_model) 进行重排序") rerank_results = rerank_model.lq_rerank(query_text, cleaned_documents, top_k) elif self.rerank_model_type == 'silicoflow_rerank_model': self.logger.info("使用硅基流动 Qwen3-Reranker-8B (silicoflow_rerank_model) 进行重排序") rerank_results = rerank_model.qwen3_rerank(query_text, cleaned_documents, top_k) else: # bge_rerank_model (默认) self.logger.info("使用 BGE Reranker (bge_rerank_model) 进行重排序") rerank_results = rerank_model.bge_rerank(query_text, cleaned_documents, top_k) # 将清理后的文本映射回原始文本(所有reranker都需要) for result in rerank_results: cleaned_text = result.get('text', '') # 查找原始文本 for i, cleaned in enumerate(cleaned_documents): if cleaned == cleaned_text: result['text'] = valid_original_docs[i] break # 统一字段名:将 relevance_score 转换为 score if 'relevance_score' in result and 'score' not in result: result['score'] = float(result['relevance_score']) return rerank_results except Exception as e: self.logger.error(f"重排序失败,模型类型: {self.rerank_model_type}, 错误: {str(e)}") # 返回原始顺序作为fallback return [{"text": doc, "score": 0.0} for i, doc in enumerate(documents[:top_k])] @track_execution_time async def entity_recall(self, main_entity: str, assisted_search_entity: list, recall_top_k: int = 5, max_results: int = None) -> List[str]: """ 执行实体召回 Args: main_entity: 主查询实体 assisted_search_entity: 辅助搜索实体列表 recall_top_k: 每次单实体召回返回的数量(默认5) max_results: 最终返回的最大数量,如果为None则返回所有召回结果(默认None) Returns: List[str]: 实体文本内容列表 Note: 实际返回数量 = min(max_results, 主实体召回数 + 所有辅助实体召回数) 如果不设置max_results,可能返回较多结果(取决于辅助实体数量) """ self.logger.info(f"[entity_recall] 开始召回, recall_top_k={recall_top_k}, max_results={max_results}, 主实体='{main_entity}', 辅助实体数量={len(assisted_search_entity)}") collection_name = "first_bfp_collection_entity" # 主实体搜索 - 使用异步方法 entity_result = await self.async_multi_stage_recall( collection_name=collection_name, query_text=main_entity, hybrid_top_k=50, top_k=recall_top_k ) self.logger.info(f"[entity_recall] 主实体召回完成, 返回 {len(entity_result)} 个结果") assist_tasks = [ self.async_multi_stage_recall( collection_name=collection_name, query_text=assisted_search_entity, hybrid_top_k=50, top_k=recall_top_k ) for assisted_search_entity in assisted_search_entity ] # 辅助搜索,异步并发 assist_results_list = await asyncio.gather(*assist_tasks,return_exceptions=True) assist_results = [] for res in assist_results_list: if isinstance(res, Exception): self.logger.error(f"辅助实体召回失败: {str(res)}") else: assist_results.extend(res) all_results = entity_result + assist_results # if self.rerank_model_type == 'silicoflow_rerank_model': # with open("temp\entity_bfp_recall\silicoflow_rerank_model.json", "w", encoding="utf-8") as f: # json.dump(all_results, f, ensure_ascii=False, indent=4) # elif self.rerank_model_type == 'lq_rerank_model': # with open("temp\entity_bfp_recall\lq_rerank_model.json", "w", encoding="utf-8") as f: # json.dump(all_results, f, ensure_ascii=False, indent=4) # 去重并提取文本内容 entity_list = list(set([item['text_content'] for item in all_results])) # 如果设置了max_results,进行截断 if max_results is not None and len(entity_list) > max_results: entity_list = entity_list[:max_results] self.logger.info(f"[entity_recall] 结果截断到 max_results={max_results}") self.logger.info(f"entity_list_len:{len(entity_list)}") return entity_list @track_execution_time async def async_bfp_recall(self, entity_list: List[str],background: str , top_k: int = 3,) -> List[Dict[str, Any]]: """ 混合搜索召回 - 向量+BM25召回 Args: entity_list: 实体列表 background: 背景/上下文信息,用于二次重排 top_k: 返回结果数量 """ import time start_time = time.time() self.logger.info(f"[async_bfp_recall] 开始召回, top_k={top_k}, 实体数量={len(entity_list)}, 背景='{background[:50]}...'") # 异步并发召回编制依据 collection_name = "rag_children_hybrid" gather_start = time.time() # 优化:降低hybrid_top_k参数从50到20,减少混合搜索时间 bfp_tasks = [ self.async_multi_stage_recall( collection_name=collection_name, query_text=entity, hybrid_top_k=10, # 从50降到20,减少60%的混合搜索时间 top_k=top_k ) for entity in entity_list ] bfp_tasks_list = await asyncio.gather(*bfp_tasks,return_exceptions=True) gather_end = time.time() bfp_results = [] for res in bfp_tasks_list: if isinstance(res, Exception): self.logger.error(f"辅助实体召回失败: {str(res)}") else: bfp_results.extend(res) self.logger.info(f"[async_bfp_recall] 第一阶段召回完成, 共召回 {len(bfp_results)} 个文档") # BFP召回结果已经通过multi_stage_recall进行了重排序,保持原有顺序 # 只对第一次重排序得分大于0.8的文档进行二次重排序 high_score_results = [item for item in bfp_results if (item.get('rerank_score') or 0) > 0.8] low_score_results = [item for item in bfp_results if (item.get('rerank_score') or 0) <= 0.8] self.logger.info(f"筛选结果:高分文档(>0.8) {len(high_score_results)} 个,低分文档(≤0.8) {len(low_score_results)} 个") # 如果没有高分文档,直接返回top_k个结果(按hybrid_similarity排序) if not high_score_results: self.logger.info(f"没有得分大于0.8的文档,跳过二次重排序,返回top_k={top_k}个结果(按hybrid_similarity排序)") # 按 hybrid_similarity 降序排序,返回 top_k 个 sorted_results = sorted(bfp_results, key=lambda x: x.get('hybrid_similarity') or 0, reverse=True) return sorted_results[:top_k] # 检查background是否为空,如果为空则跳过二次重排序 if not background or not background.strip(): self.logger.warning("background为空,跳过二次重排序,直接返回高分文档") return high_score_results # 提取高分文档的文本内容用于二次重排(保持顺序去重) seen_texts = set() high_score_text_content = [] for item in high_score_results: text = item['text_content'] if text not in seen_texts: seen_texts.add(text) high_score_text_content.append(text) self.logger.info(f"提取高分文档文本内容,共 {len(high_score_text_content)} 个,准备二次重排") # 二次重排 - 使用配置的重排序模型 rerank_start = time.time() # 使用传入的 top_k 参数,而不是硬编码为5 bfp_rerank_result = self._get_rerank_results(background, high_score_text_content, top_k) rerank_end = time.time() self.logger.info(f"二次重排序耗时: {rerank_end - rerank_start:.3f}秒, top_k={top_k}") # 根据重排结果重新组织数据 reorganize_start = time.time() final_results = [] # 构建 text_content -> 原始文档列表 的映射(保留所有匹配项) text_to_items = {} for item in high_score_results: text = item['text_content'] if text not in text_to_items: text_to_items[text] = [] text_to_items[text].append(item) # 处理二次重排序的高分文档 added_texts = set() # 用于跟踪已添加的文本,避免重复 for rerank_item in bfp_rerank_result: text = rerank_item.get('text', '') parent_id = rerank_item.get('parent_id', '') score = rerank_item.get('score', 0.0) if text in text_to_items and text not in added_texts: # 获取该文本的所有候选文档,选择 rerank_score 最高的 candidates = text_to_items[text] best_candidate = max(candidates, key=lambda x: x.get('rerank_score', 0.0)) result_item = best_candidate.copy() result_item['bfp_rerank_score'] = score result_item['bfp_rerank_parent_id'] = parent_id final_results.append(result_item) added_texts.add(text) # 标记该文本已添加 reorganize_end = time.time() total_time = reorganize_end - start_time self.logger.info(f"结果重组耗时: {reorganize_end - reorganize_start:.3f}秒") self.logger.info(f"二次重排完成,返回 {len(final_results)} 个高分文档(top_k={top_k}),丢弃 {len(low_score_results)} 个低分文档") self.logger.info(f"[async_bfp_recall] 总耗时: {total_time:.3f}秒 (召回: {gather_end-gather_start:.3f}s + 重排: {rerank_end-rerank_start:.3f}s + 其他: {total_time-(gather_end-gather_start)-(rerank_end-rerank_start):.3f}s)") return final_results def hybrid_search_recall(self, collection_name: str, query_text: str, top_k: int = 10 , ranker_type: str = "weighted", dense_weight: float = 0.7, sparse_weight: float = 0.3) -> List[Dict[str, Any]]: """ 混合搜索召回 - 向量+BM25召回 Args: collection_name: 集合名称 query_text: 查询文本 top_k: 返回结果数量 ranker_type: 重排序类型 "weighted" 或 "rrf" dense_weight: 密集向量权重 sparse_weight: 稀疏向量权重 Returns: List[Dict]: 搜索结果列表 """ try: self.logger.info(f"开始混合检索") param = {'collection_name': collection_name} # 直接调用同步的混合搜索(在同步方法中) results = self.vector_manager.hybrid_search( param=param, query_text=query_text, top_k=top_k, ranker_type=ranker_type, dense_weight=dense_weight, sparse_weight=sparse_weight ) # 详细记录混合搜索结果 self.logger.info(f"混合搜索召回返回 {len(results)} 个结果") return results except Exception as e: self.logger.error(f"混合搜索召回失败: {str(e)}") return [] def rerank_recall(self, candidates_with_metadata: List[Dict[str, Any]], query_text: str, top_k: int = None ) -> List[Dict[str, Any]]: """ 重排序召回 - 使用配置的重排序模型对候选文档重新排序 Args: candidates_with_metadata: 候选文档列表,包含文本内容和元数据 query_text: 查询文本 top_k: 返回结果数量 Returns: List[Dict]: 重排序后的结果列表,包含原始索引信息 """ try: # 第一步:基于文本内容+元数据的组合去重 unique_candidates = [] original_indices_map = [] # 记录每个去重后的候选文档对应的原始索引列表 unique_combinations = set() # 记录已见过的文本+元数据组合 for original_index, candidate in enumerate(candidates_with_metadata): text_content = candidate.get('text_content', '') metadata = candidate.get('metadata', {}) # 处理嵌套的metadata字符串 title = '' file = '' if 'metadata' in metadata and isinstance(metadata['metadata'], str): import json try: # 解析JSON格式的metadata inner_metadata = json.loads(metadata['metadata']) title = inner_metadata.get('title', '') file = inner_metadata.get('file', '') except (json.JSONDecodeError, TypeError): pass else: title = metadata.get('title', '') file = metadata.get('file', '') # 创建组合键:文本内容 + 关键元数据 combination_key = (text_content, title, file) if combination_key not in unique_combinations: # 新的唯一组合 unique_candidates.append(candidate) original_indices_map.append([original_index]) unique_combinations.add(combination_key) else: # 找到对应的唯一候选并添加索引 for unique_idx, unique_candidate in enumerate(unique_candidates): if unique_candidate.get('text_content', '') == text_content: # 解析唯一候选的元数据 unique_metadata = unique_candidate.get('metadata', {}) unique_title = '' unique_file = '' if 'metadata' in unique_metadata and isinstance(unique_metadata['metadata'], str): import json try: inner_metadata = json.loads(unique_metadata['metadata']) unique_title = inner_metadata.get('title', '') unique_file = inner_metadata.get('file', '') except (json.JSONDecodeError, TypeError): pass else: unique_title = unique_metadata.get('title', '') unique_file = unique_metadata.get('file', '') if unique_title == title and unique_file == file: original_indices_map[unique_idx].append(original_index) break # 提取唯一候选文档的文本内容用于重排序 unique_texts = [candidate.get('text_content', '') for candidate in unique_candidates] # 使用配置的重排序模型进行重排序 rerank_results = self._get_rerank_results(query_text, unique_texts, top_k) # 转换结果格式,使用索引映射来处理原始索引 scored_docs = [] for i, api_result in enumerate(rerank_results): rerank_text = api_result.get('text', '') rerank_score = float(api_result.get('score', '0.0')) # 根据 rerank_text 在 unique_candidates 中查找匹配项 # (rerank 会改变顺序,不能直接用索引 i) found_index = None original_candidate = None for idx, candidate in enumerate(unique_candidates): if candidate.get('text_content', '') == rerank_text: found_index = idx original_candidate = candidate break if original_candidate is None: self.logger.warning(f"[rerank_recall] 未找到匹配的候选文档,跳过: {rerank_text[:50]}...") continue # 使用找到的索引获取原始索引映射 original_index = original_indices_map[found_index][0] if found_index < len(original_indices_map) else i # 获取原始混合搜索的评分信息 hybrid_distance = original_candidate.get('distance', 0.0) hybrid_similarity = original_candidate.get('similarity', 0.0) # 解析元数据获取标题用于日志 metadata = original_candidate.get('metadata', {}) title = 'N/A' if 'metadata' in metadata and isinstance(metadata['metadata'], str): try: import json inner_metadata = json.loads(metadata['metadata']) title = inner_metadata.get('title', 'N/A') except: pass scored_docs.append({ 'text_content': rerank_text, 'metadata': original_candidate.get('metadata', {}), # 保留原始元数据 'rerank_score': rerank_score, 'original_index': original_index, 'rerank_rank': i, 'duplicate_count': len(original_indices_map[i]), # 记录重复数量 'hybrid_distance': hybrid_distance, # 保留原始混合搜索评分 'hybrid_similarity': hybrid_similarity }) # 输出双重评分信息 # self.logger.info(f"重排序评分 #{i+1}: 标题='{title}' | 混合搜索相似度={hybrid_similarity:.4f} | BGE重排序评分={rerank_score:.6f}") return scored_docs except Exception as e: self.logger.error(f"重排序召回失败: {str(e)}") return [] def multi_stage_recall(self, collection_name: str, query_text: str, hybrid_top_k: int = 50, top_k: int = 10, ranker_type: str = "weighted") -> List[Dict[str, Any]]: """ 多路召回 - 先混合搜索召回,再重排序,只返回重排序结果 Args: collection_name: 集合名称 query_text: 查询文本 hybrid_top_k: 混合搜索召回的文档数量 top_k: 最终返回的文档数量 ranker_type: 混合搜索的重排序类型 Returns: List[Dict]: 重排序后的结果列表,只包含重排序分数 """ try: self.logger.info(f"执行多路召回") # 第一阶段:混合搜索召回(向量+BM25) hybrid_results = self.hybrid_search_recall( collection_name=collection_name, query_text=query_text, top_k=hybrid_top_k, ranker_type=ranker_type ) if not hybrid_results: self.logger.warning("混合搜索召回无结果,返回空列表") return [] # 第二阶段:重排序召回,传递完整的混合搜索结果(包含元数据) rerank_results = self.rerank_recall( candidates_with_metadata=hybrid_results, query_text=query_text, top_k=top_k ) # 优化重排序结果的元数据结构 final_results = [] for rerank_result in rerank_results: metadata = rerank_result.get('metadata', {}).copy() duplicate_count = rerank_result.get('duplicate_count', 1) # 如果内层有metadata字段,将其提取到外层 if 'metadata' in metadata and isinstance(metadata['metadata'], str): import json try: # 解析JSON格式的metadata inner_metadata = json.loads(metadata['metadata']) metadata.update(inner_metadata) # 移除内层的metadata字符串,避免重复 del metadata['metadata'] except (json.JSONDecodeError, TypeError): # 如果解析失败,保持原样 pass # 移除重复的content字段 if 'content' in metadata: del metadata['content'] # 添加重复计数信息到元数据中 if duplicate_count > 1: metadata['duplicate_count'] = duplicate_count # 输出优化后的结果,包含双重评分 final_result = { 'text_content': rerank_result['text_content'], 'metadata': metadata, 'hybrid_similarity': rerank_result.get('hybrid_similarity', 0.0), # 混合搜索相似度 'rerank_score': rerank_result.get('rerank_score', 0.0) # BGE重排序评分 } final_results.append(final_result) self.logger.debug(f"元数据优化完成: 重排序排名{rerank_result.get('rerank_rank')}, 重复数量={duplicate_count}") return final_results except Exception as e: self.logger.error(f"多路召回失败: {str(e)}") return [] async def async_multi_stage_recall(self, collection_name: str, query_text: str, hybrid_top_k: int = 50, top_k: int = 10, ranker_type: str = "weighted") -> List[Dict[str, Any]]: """ 多路召回 - 先混合搜索召回,再重排序,只返回重排序结果 Args: collection_name: 集合名称 query_text: 查询文本 hybrid_top_k: 混合搜索召回的文档数量 top_k: 最终返回的文档数量 ranker_type: 混合搜索的重排序类型 Returns: List[Dict]: 重排序后的结果列表,只包含重排序分数 """ import time try: start_time = time.time() # 第一阶段:混合搜索召回(向量+BM25) hybrid_results = await asyncio.to_thread( self.hybrid_search_recall, collection_name=collection_name, query_text=query_text, top_k=hybrid_top_k, ranker_type=ranker_type ) if not hybrid_results: return [] # 第二阶段:重排序召回 rerank_results = self.rerank_recall( candidates_with_metadata=hybrid_results, query_text=query_text, top_k=top_k ) # 优化重排序结果的元数据结构 final_results = [] for rerank_result in rerank_results: metadata = rerank_result.get('metadata', {}).copy() duplicate_count = rerank_result.get('duplicate_count', 1) # 如果内层有metadata字段,将其提取到外层 if 'metadata' in metadata and isinstance(metadata['metadata'], str): import json try: # 解析JSON格式的metadata inner_metadata = json.loads(metadata['metadata']) metadata.update(inner_metadata) # 移除内层的metadata字符串,避免重复 del metadata['metadata'] except (json.JSONDecodeError, TypeError): # 如果解析失败,保持原样 pass # 移除重复的content字段 if 'content' in metadata: del metadata['content'] # 添加重复计数信息到元数据中 if duplicate_count > 1: metadata['duplicate_count'] = duplicate_count # 输出优化后的结果,包含双重评分 final_result = { 'text_content': rerank_result['text_content'], 'metadata': metadata, 'hybrid_similarity': rerank_result.get('hybrid_similarity', 0.0), # 混合搜索相似度 'rerank_score': rerank_result.get('rerank_score', 0.0) # BGE重排序评分 } final_results.append(final_result) self.logger.debug(f"元数据优化完成: 重排序排名{rerank_result.get('rerank_rank')}, 重复数量={duplicate_count}") return final_results except Exception as e: self.logger.error(f"多路召回失败: {str(e)}") return [] # 创建全局召回管理器实例 retrieval_manager = RetrievalManager()