| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608 |
- import asyncio
- import json
- from typing import List, Dict, Any, Optional
- from foundation.ai.models.rerank_model import rerank_model
- from foundation.observability.monitoring.time_statistics import track_execution_time
- from foundation.infrastructure.config.config import config_handler
- from foundation.observability.logger.loggering import server_logger
- from foundation.database.base.vector.milvus_vector import MilvusVectorManager
- class RetrievalManager:
- """
- 召回管理器,实现多路召回功能
- """
- def __init__(self):
- """
- 初始化召回管理器
- """
- self.vector_manager = MilvusVectorManager()
- self.logger = server_logger
- self.dense_weight = config_handler.get('hybrid_search', 'DENSE_WEIGHT', 0.7)
- self.sparse_weight = config_handler.get('hybrid_search', 'SPARSE_WEIGHT', 0.3)
- # 重排序模型配置
- self.rerank_model_type = config_handler.get('retrieval', 'RERANK_MODEL_TYPE', 'bge') # 'bge' 或 'qwen3'
- self.logger.info(f"初始化重排序模型类型: {self.rerank_model_type}")
- def set_rerank_model(self, model_type: str):
- """
- 设置重排序模型类型
- Args:
- model_type: 模型类型 ('bge' 或 'qwen3')
- """
- if model_type not in ['bge', 'qwen3']:
- raise ValueError("model_type 必须是 'bge' 或 'qwen3'")
- self.rerank_model_type = model_type
- self.logger.info(f"重排序模型类型已设置为: {model_type}")
- def _clean_document(self, doc: str) -> str:
- """
- 清理文档文本,移除HTML标签和特殊字符
- Args:
- doc: 原始文档文本
- Returns:
- str: 清理后的文档文本
- """
- if not isinstance(doc, str):
- self.logger.debug(f"文档类型转换: {type(doc)} -> str")
- return str(doc)
- original_length = len(doc)
- # 移除HTML标签
- import re
- doc = re.sub(r'<[^>]+>', '', doc)
- # 移除多余的空白字符
- doc = re.sub(r'\s+', ' ', doc)
- # 更宽松的字符过滤 - 保留更多字符
- doc = re.sub(r'[^\u4e00-\u9fff\w\s.,;:!?()()。,;:!?\-\+\=\*/%&@#¥$【】「」""''""\n\r]', '', doc)
- # 截断过长的文本
- if len(doc) > 8000: # 设置最大长度限制
- doc = doc[:8000] + "..."
- cleaned_doc = doc.strip()
- self.logger.debug(f"文档清理: {original_length} -> {len(cleaned_doc)} 字符")
- return cleaned_doc
- def _get_rerank_results(self, query_text: str, documents: List[str], top_k: int = None) -> List[Dict[str, Any]]:
- """
- 根据配置选择重排序模型并执行重排序
- Args:
- query_text: 查询文本
- documents: 文档列表
- top_k: 返回结果数量
- Returns:
- List[Dict]: 重排序后的结果列表
- """
- try:
- # 清理和验证文档列表
- cleaned_documents = []
- valid_original_docs = []
- for doc in documents:
- if doc and isinstance(doc, str) and doc.strip():
- cleaned_doc = self._clean_document(doc)
- if cleaned_doc and len(cleaned_doc) > 3:
- cleaned_documents.append(cleaned_doc)
- valid_original_docs.append(doc)
- if not cleaned_documents:
- return []
- if self.rerank_model_type == 'qwen3':
- self.logger.info("使用 Qwen3-Reranker-8B 进行重排序")
- rerank_results = rerank_model.qwen3_rerank(query_text, cleaned_documents, top_k)
- # 将清理后的文本映射回原始文本
- for result in rerank_results:
- cleaned_text = result.get('text', '')
- # 查找原始文本
- for i, cleaned in enumerate(cleaned_documents):
- if cleaned == cleaned_text:
- result['text'] = valid_original_docs[i]
- break
- return rerank_results
- else:
- self.logger.info("使用 BGE Reranker 进行重排序")
- rerank_results = rerank_model.bge_rerank(query_text, cleaned_documents, top_k)
- # 将清理后的文本映射回原始文本
- for result in rerank_results:
- cleaned_text = result.get('text', '')
- # 查找原始文本
- for i, cleaned in enumerate(cleaned_documents):
- if cleaned == cleaned_text:
- result['text'] = valid_original_docs[i]
- break
- return rerank_results
- except Exception as e:
- self.logger.error(f"重排序失败,模型类型: {self.rerank_model_type}, 错误: {str(e)}")
- # 返回原始顺序作为fallback
- return [{"text": doc, "score": 0.0} for i, doc in enumerate(documents[:top_k])]
- @track_execution_time
- async def entity_recall(self, main_entity: str,assisted_search_entity: list,
- top_k: int = 5) -> List[Dict[str, Any]]:
- """
- 执行实体召回
- :param main_entity: 查询实体
- :param assisted_search_entity: 辅助搜索实体
- :param top_k: 返回结果数量
- """
- collection_name = "first_bfp_collection_entity"
- # 主实体搜索 - 使用异步方法
- entity_result = await self.async_multi_stage_recall(
- collection_name=collection_name,
- query_text=main_entity,
- hybrid_top_k=20, # 从默认50降到20
- top_k=top_k
- )
- assist_tasks = [
- self.async_multi_stage_recall(
- collection_name=collection_name,
- query_text=assisted_search_entity,
- hybrid_top_k=20, # 从默认50降到20
- top_k=top_k
- ) for assisted_search_entity in assisted_search_entity
- ]
- # 辅助搜索,异步并发
- assist_results_list = await asyncio.gather(*assist_tasks,return_exceptions=True)
- assist_results = []
- for res in assist_results_list:
- if isinstance(res, Exception):
- self.logger.error(f"辅助实体召回失败: {str(res)}")
- else:
- assist_results.extend(res)
- all_results = entity_result + assist_results
- entity_list = list(set([item['text_content'] for item in all_results]))
- self.logger.info(f"entity_list:{entity_list}")
- return entity_list
-
- @track_execution_time
- async def async_bfp_recall(self, entity_list: List[str],background: str ,
- top_k: int = 3,) -> List[Dict[str, Any]]:
- """
- 混合搜索召回 - 向量+BM25召回
- Args:
- entity_list: 实体列表
- background: 背景/上下文信息,用于二次重排
- top_k: 返回结果数量
- """
- import time
- start_time = time.time()
- # 异步并发召回编制依据
- collection_name = "first_bfp_collection_test"
- gather_start = time.time()
- # 优化:降低hybrid_top_k参数从50到20,减少混合搜索时间
- bfp_tasks = [
- self.async_multi_stage_recall(
- collection_name=collection_name,
- query_text=entity,
- hybrid_top_k=20, # 从50降到20,减少60%的混合搜索时间
- top_k=top_k
- ) for entity in entity_list
- ]
- bfp_tasks_list = await asyncio.gather(*bfp_tasks,return_exceptions=True)
- gather_end = time.time()
- gather_time = gather_end - gather_start
- bfp_results = []
- for res in bfp_tasks_list:
- if isinstance(res, Exception):
- self.logger.error(f"辅助实体召回失败: {str(res)}")
- else:
- bfp_results.extend(res)
- # BFP召回结果已经通过multi_stage_recall进行了重排序,保持原有顺序
- # 只对第一次重排序得分大于0.8的文档进行二次重排序
- high_score_results = [item for item in bfp_results if item.get('rerank_score', 0) > 0.8]
- low_score_results = [item for item in bfp_results if item.get('rerank_score', 0) <= 0.8]
- self.logger.info(f"筛选结果:高分文档(>0.8) {len(high_score_results)} 个,低分文档(≤0.8) {len(low_score_results)} 个")
- # 如果没有高分文档,直接返回原始结果
- if not high_score_results:
- self.logger.info("没有得分大于0.8的文档,跳过二次重排序,直接返回原始结果")
- return bfp_results
- # 提取高分文档的文本内容用于二次重排
- high_score_text_content = list(set([item['text_content'] for item in high_score_results]))
- self.logger.info(f"提取高分文档文本内容,共 {len(high_score_text_content)} 个,准备二次重排")
- # 二次重排 - 使用配置的重排序模型
- rerank_start = time.time()
- bfp_rerank_result = self._get_rerank_results(background, high_score_text_content, 5)
- rerank_end = time.time()
- self.logger.info(f"二次重排序耗时: {rerank_end - rerank_start:.3f}秒")
- # 根据重排结果重新组织数据
- reorganize_start = time.time()
- final_results = []
- text_to_metadata = {item['text_content']: item for item in high_score_results}
- # 处理二次重排序的高分文档
- for rerank_item in bfp_rerank_result:
- text = rerank_item.get('text', '')
- score = rerank_item.get('score', 0.0)
- if text in text_to_metadata:
- original_item = text_to_metadata[text].copy()
- original_item['bfp_rerank_score'] = score
- final_results.append(original_item)
- reorganize_end = time.time()
- total_time = reorganize_end - start_time
- self.logger.info(f"结果重组耗时: {reorganize_end - reorganize_start:.3f}秒")
- self.logger.info(f"二次重排完成,返回 {len(final_results)} 个高分文档,丢弃 {len(low_score_results)} 个低分文档")
- self.logger.info(f"[async_bfp_recall] 总耗时: {total_time:.3f}秒 (召回: {gather_end-gather_start:.3f}s + 重排: {rerank_end-rerank_start:.3f}s + 其他: {total_time-(gather_end-gather_start)-(rerank_end-rerank_start):.3f}s)")
- return final_results
- def hybrid_search_recall(self, collection_name: str, query_text: str,
- top_k: int = 10 , ranker_type: str = "weighted",
- dense_weight: float = 0.7, sparse_weight: float = 0.3) -> List[Dict[str, Any]]:
- """
- 混合搜索召回 - 向量+BM25召回
- Args:
- collection_name: 集合名称
- query_text: 查询文本
- top_k: 返回结果数量
- ranker_type: 重排序类型 "weighted" 或 "rrf"
- dense_weight: 密集向量权重
- sparse_weight: 稀疏向量权重
- Returns:
- List[Dict]: 搜索结果列表
- """
- try:
- self.logger.info(f"开始混合检索")
- param = {'collection_name': collection_name}
- # 直接调用同步的混合搜索(在同步方法中)
- results = self.vector_manager.hybrid_search(
- param=param,
- query_text=query_text,
- top_k=top_k,
- ranker_type=ranker_type,
- dense_weight=dense_weight,
- sparse_weight=sparse_weight
- )
- # 详细记录混合搜索结果
- self.logger.info(f"混合搜索召回返回 {len(results)} 个结果")
- # for i, result in enumerate(results):
- # text_content = result.get('text_content', '')
- # metadata = result.get('metadata', {})
- # title = metadata.get('title', 'N/A')
- # file = metadata.get('file', 'N/A')
- # self.logger.info(f"混合搜索结果 {i+1}: 标题='{title}', 文件='{file}', 内容长度={len(text_content)}")
- # # self.logger.info(f" 完整元数据: {metadata}")
- # # self.logger.info(f" 文本内容: '{text_content}'")
- return results
- except Exception as e:
- self.logger.error(f"混合搜索召回失败: {str(e)}")
- return []
- def rerank_recall(self, candidates_with_metadata: List[Dict[str, Any]], query_text: str,
- top_k: int = None ) -> List[Dict[str, Any]]:
- """
- 重排序召回 - 使用配置的重排序模型对候选文档重新排序
- Args:
- candidates_with_metadata: 候选文档列表,包含文本内容和元数据
- query_text: 查询文本
- top_k: 返回结果数量
- Returns:
- List[Dict]: 重排序后的结果列表,包含原始索引信息
- """
- try:
- # 第一步:基于文本内容+元数据的组合去重
- unique_candidates = []
- original_indices_map = [] # 记录每个去重后的候选文档对应的原始索引列表
- unique_combinations = set() # 记录已见过的文本+元数据组合
- for original_index, candidate in enumerate(candidates_with_metadata):
- text_content = candidate.get('text_content', '')
- metadata = candidate.get('metadata', {})
- # 处理嵌套的metadata字符串
- title = ''
- file = ''
- if 'metadata' in metadata and isinstance(metadata['metadata'], str):
- import json
- try:
- # 解析JSON格式的metadata
- inner_metadata = json.loads(metadata['metadata'])
- title = inner_metadata.get('title', '')
- file = inner_metadata.get('file', '')
- except (json.JSONDecodeError, TypeError):
- pass
- else:
- title = metadata.get('title', '')
- file = metadata.get('file', '')
- # 创建组合键:文本内容 + 关键元数据
- combination_key = (text_content, title, file)
- if combination_key not in unique_combinations:
- # 新的唯一组合
- unique_candidates.append(candidate)
- original_indices_map.append([original_index])
- unique_combinations.add(combination_key)
- else:
- # 找到对应的唯一候选并添加索引
- for unique_idx, unique_candidate in enumerate(unique_candidates):
- if unique_candidate.get('text_content', '') == text_content:
- # 解析唯一候选的元数据
- unique_metadata = unique_candidate.get('metadata', {})
- unique_title = ''
- unique_file = ''
- if 'metadata' in unique_metadata and isinstance(unique_metadata['metadata'], str):
- import json
- try:
- inner_metadata = json.loads(unique_metadata['metadata'])
- unique_title = inner_metadata.get('title', '')
- unique_file = inner_metadata.get('file', '')
- except (json.JSONDecodeError, TypeError):
- pass
- else:
- unique_title = unique_metadata.get('title', '')
- unique_file = unique_metadata.get('file', '')
- if unique_title == title and unique_file == file:
- original_indices_map[unique_idx].append(original_index)
- break
- # 提取唯一候选文档的文本内容用于重排序
- unique_texts = [candidate.get('text_content', '') for candidate in unique_candidates]
- # 使用配置的重排序模型进行重排序
- rerank_results = self._get_rerank_results(query_text, unique_texts, top_k)
- # 转换结果格式,使用索引映射来处理原始索引
- scored_docs = []
- for i, api_result in enumerate(rerank_results):
- rerank_text = api_result.get('text', '')
- rerank_score = float(api_result.get('score', '0.0'))
- # 使用去重时的索引映射
- original_index = original_indices_map[i][0] # 取第一个原始索引
- original_candidate = unique_candidates[i] # 获取原始候选文档(包含元数据)
- # 获取原始混合搜索的评分信息
- hybrid_distance = original_candidate.get('distance', 0.0)
- hybrid_similarity = original_candidate.get('similarity', 0.0)
- # 解析元数据获取标题用于日志
- metadata = original_candidate.get('metadata', {})
- title = 'N/A'
- if 'metadata' in metadata and isinstance(metadata['metadata'], str):
- try:
- import json
- inner_metadata = json.loads(metadata['metadata'])
- title = inner_metadata.get('title', 'N/A')
- except:
- pass
- scored_docs.append({
- 'text_content': rerank_text,
- 'metadata': original_candidate.get('metadata', {}), # 保留原始元数据
- 'rerank_score': rerank_score,
- 'original_index': original_index,
- 'rerank_rank': i,
- 'duplicate_count': len(original_indices_map[i]), # 记录重复数量
- 'hybrid_distance': hybrid_distance, # 保留原始混合搜索评分
- 'hybrid_similarity': hybrid_similarity
- })
- # 输出双重评分信息
- # self.logger.info(f"重排序评分 #{i+1}: 标题='{title}' | 混合搜索相似度={hybrid_similarity:.4f} | BGE重排序评分={rerank_score:.6f}")
- return scored_docs
- except Exception as e:
- self.logger.error(f"重排序召回失败: {str(e)}")
- return []
- def multi_stage_recall(self, collection_name: str, query_text: str,
- hybrid_top_k: int = 50, top_k: int = 10,
- ranker_type: str = "weighted") -> List[Dict[str, Any]]:
- """
- 多路召回 - 先混合搜索召回,再重排序,只返回重排序结果
- Args:
- collection_name: 集合名称
- query_text: 查询文本
- hybrid_top_k: 混合搜索召回的文档数量
- top_k: 最终返回的文档数量
- ranker_type: 混合搜索的重排序类型
- Returns:
- List[Dict]: 重排序后的结果列表,只包含重排序分数
- """
- try:
- self.logger.info(f"执行多路召回")
- # 第一阶段:混合搜索召回(向量+BM25)
- hybrid_results = self.hybrid_search_recall(
- collection_name=collection_name,
- query_text=query_text,
- top_k=hybrid_top_k,
- ranker_type=ranker_type
- )
- if not hybrid_results:
- self.logger.warning("混合搜索召回无结果,返回空列表")
- return []
- # 第二阶段:重排序召回,传递完整的混合搜索结果(包含元数据)
- rerank_results = self.rerank_recall(
- candidates_with_metadata=hybrid_results,
- query_text=query_text,
- top_k=top_k
- )
- # 优化重排序结果的元数据结构
- final_results = []
- for rerank_result in rerank_results:
- metadata = rerank_result.get('metadata', {}).copy()
- duplicate_count = rerank_result.get('duplicate_count', 1)
- # 如果内层有metadata字段,将其提取到外层
- if 'metadata' in metadata and isinstance(metadata['metadata'], str):
- import json
- try:
- # 解析JSON格式的metadata
- inner_metadata = json.loads(metadata['metadata'])
- metadata.update(inner_metadata)
- # 移除内层的metadata字符串,避免重复
- del metadata['metadata']
- except (json.JSONDecodeError, TypeError):
- # 如果解析失败,保持原样
- pass
- # 移除重复的content字段
- if 'content' in metadata:
- del metadata['content']
- # 添加重复计数信息到元数据中
- if duplicate_count > 1:
- metadata['duplicate_count'] = duplicate_count
- # 输出优化后的结果,包含双重评分
- final_result = {
- 'text_content': rerank_result['text_content'],
- 'metadata': metadata,
- 'hybrid_similarity': rerank_result.get('hybrid_similarity', 0.0), # 混合搜索相似度
- 'rerank_score': rerank_result.get('rerank_score', 0.0) # BGE重排序评分
- }
- final_results.append(final_result)
- self.logger.debug(f"元数据优化完成: 重排序排名{rerank_result.get('rerank_rank')}, 重复数量={duplicate_count}")
- return final_results
- except Exception as e:
- self.logger.error(f"多路召回失败: {str(e)}")
- return []
- async def async_multi_stage_recall(self, collection_name: str, query_text: str,
- hybrid_top_k: int = 50, top_k: int = 10,
- ranker_type: str = "weighted") -> List[Dict[str, Any]]:
- """
- 多路召回 - 先混合搜索召回,再重排序,只返回重排序结果
- Args:
- collection_name: 集合名称
- query_text: 查询文本
- hybrid_top_k: 混合搜索召回的文档数量
- top_k: 最终返回的文档数量
- ranker_type: 混合搜索的重排序类型
- Returns:
- List[Dict]: 重排序后的结果列表,只包含重排序分数
- """
- import time
- try:
- start_time = time.time()
- # 第一阶段:混合搜索召回(向量+BM25)
- hybrid_start = time.time()
- hybrid_results = await asyncio.to_thread(
- self.hybrid_search_recall,
- collection_name=collection_name,
- query_text=query_text,
- top_k=hybrid_top_k,
- ranker_type=ranker_type
- )
- if not hybrid_results:
- return []
- # 第二阶段:重排序召回
- rerank_results = self.rerank_recall(
- candidates_with_metadata=hybrid_results,
- query_text=query_text,
- top_k=top_k
- )
- # 优化重排序结果的元数据结构
- final_results = []
- for rerank_result in rerank_results:
- metadata = rerank_result.get('metadata', {}).copy()
- duplicate_count = rerank_result.get('duplicate_count', 1)
- # 如果内层有metadata字段,将其提取到外层
- if 'metadata' in metadata and isinstance(metadata['metadata'], str):
- import json
- try:
- # 解析JSON格式的metadata
- inner_metadata = json.loads(metadata['metadata'])
- metadata.update(inner_metadata)
- # 移除内层的metadata字符串,避免重复
- del metadata['metadata']
- except (json.JSONDecodeError, TypeError):
- # 如果解析失败,保持原样
- pass
- # 移除重复的content字段
- if 'content' in metadata:
- del metadata['content']
- # 添加重复计数信息到元数据中
- if duplicate_count > 1:
- metadata['duplicate_count'] = duplicate_count
- # 输出优化后的结果,包含双重评分
- final_result = {
- 'text_content': rerank_result['text_content'],
- 'metadata': metadata,
- 'hybrid_similarity': rerank_result.get('hybrid_similarity', 0.0), # 混合搜索相似度
- 'rerank_score': rerank_result.get('rerank_score', 0.0) # BGE重排序评分
- }
- final_results.append(final_result)
- self.logger.debug(f"元数据优化完成: 重排序排名{rerank_result.get('rerank_rank')}, 重复数量={duplicate_count}")
- return final_results
- except Exception as e:
- self.logger.error(f"多路召回失败: {str(e)}")
- return []
- # 创建全局召回管理器实例
- retrieval_manager = RetrievalManager()
|