retrieval.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705
  1. import asyncio
  2. import json
  3. from typing import List, Dict, Any, Optional
  4. from foundation.ai.models.rerank_model import rerank_model
  5. from foundation.observability.monitoring.time_statistics import track_execution_time
  6. from foundation.infrastructure.config.config import config_handler
  7. from foundation.observability.logger.loggering import review_logger
  8. from foundation.database.base.vector.milvus_vector import MilvusVectorManager
  9. # model_setting.yaml 模型名 → (rerank_model 方法名, 日志描述)
  10. # 兼容旧 config.ini section 名(如 'bge_rerank_model')以保证平稳过渡
  11. RERANK_MODEL_ROUTE = {
  12. # model_setting.yaml 标准名称
  13. 'shutian_qwen3_reranker': ('shutian_rerank', '蜀天云算力 Qwen3-Reranker-8B'),
  14. 'lq_bge_reranker_v2_m3': ('bge_rerank', '本地 BGE-reranker-v2-m3'),
  15. 'lq_qwen3_reranker': ('lq_rerank', '本地 Qwen3-Reranker-8B'),
  16. 'silicoflow_qwen3_reranker': ('qwen3_rerank', '硅基流动 Qwen3-Reranker-8B'),
  17. # 旧 config.ini section 名(向后兼容)
  18. 'bge_rerank_model': ('bge_rerank', '本地 BGE-reranker-v2-m3'),
  19. 'lq_rerank_model': ('lq_rerank', '本地 Qwen3-Reranker-8B'),
  20. 'silicoflow_rerank_model': ('qwen3_rerank', '硅基流动 Qwen3-Reranker-8B'),
  21. 'shutian_rerank_model': ('shutian_rerank', '蜀天云算力 Qwen3-Reranker-8B'),
  22. }
  23. VALID_RERANK_MODELS = list(RERANK_MODEL_ROUTE.keys())
  24. class RetrievalManager:
  25. """
  26. 召回管理器,实现多路召回功能
  27. """
  28. def __init__(self):
  29. """
  30. 初始化召回管理器
  31. """
  32. self.vector_manager = MilvusVectorManager()
  33. self.logger = review_logger
  34. self.dense_weight = config_handler.get('hybrid_search', 'DENSE_WEIGHT', 0.7)
  35. self.sparse_weight = config_handler.get('hybrid_search', 'SPARSE_WEIGHT', 0.3)
  36. # 重排序模型配置:优先从 model_setting.yaml 读取
  37. rerank_model_name = self._resolve_rerank_model()
  38. self.rerank_model_type = rerank_model_name
  39. self.logger.info(f"初始化重排序模型类型: {self.rerank_model_type}")
  40. @staticmethod
  41. def _resolve_rerank_model() -> str:
  42. """从 model_setting.yaml 读取 rerank 模型"""
  43. try:
  44. from foundation.ai.models.model_config_loader import get_model_for_function
  45. model_name = get_model_for_function("rerank")
  46. if model_name and model_name in RERANK_MODEL_ROUTE:
  47. return model_name
  48. except Exception:
  49. pass
  50. return "shutian_qwen3_reranker"
  51. def set_rerank_model(self, model_type: str):
  52. """
  53. 设置重排序模型类型
  54. Args:
  55. model_type: 模型名称(支持 model_setting.yaml 名称或旧 config.ini section 名)
  56. """
  57. if model_type not in VALID_RERANK_MODELS:
  58. raise ValueError(f"model_type 必须是 {VALID_RERANK_MODELS}")
  59. self.rerank_model_type = model_type
  60. self.logger.info(f"重排序模型类型已设置为: {model_type}")
  61. def _clean_document(self, doc: str) -> str:
  62. """
  63. 清理文档文本,移除HTML标签和特殊字符
  64. Args:
  65. doc: 原始文档文本
  66. Returns:
  67. str: 清理后的文档文本
  68. """
  69. if not isinstance(doc, str):
  70. self.logger.debug(f"文档类型转换: {type(doc)} -> str")
  71. return str(doc)
  72. original_length = len(doc)
  73. # 移除HTML标签
  74. import re
  75. doc = re.sub(r'<[^>]+>', '', doc)
  76. # 移除多余的空白字符
  77. doc = re.sub(r'\s+', ' ', doc)
  78. # 更宽松的字符过滤 - 保留更多字符
  79. doc = re.sub(r'[^\u4e00-\u9fff\w\s.,;:!?()()。,;:!?\-\+\=\*/%&@#¥$【】「」""''""\n\r]', '', doc)
  80. # 截断过长的文本
  81. if len(doc) > 8000: # 设置最大长度限制
  82. doc = doc[:8000] + "..."
  83. cleaned_doc = doc.strip()
  84. self.logger.debug(f"文档清理: {original_length} -> {len(cleaned_doc)} 字符")
  85. return cleaned_doc
  86. def _get_rerank_results(self, query_text: str, documents: List[str], top_k: int = None) -> List[Dict[str, Any]]:
  87. """
  88. 根据配置选择重排序模型并执行重排序
  89. Args:
  90. query_text: 查询文本
  91. documents: 文档列表
  92. top_k: 返回结果数量
  93. Returns:
  94. List[Dict]: 重排序后的结果列表
  95. """
  96. try:
  97. # 清理和验证文档列表
  98. cleaned_documents = []
  99. valid_original_docs = []
  100. for doc in documents:
  101. if doc and isinstance(doc, str) and doc.strip():
  102. cleaned_doc = self._clean_document(doc)
  103. if cleaned_doc and len(cleaned_doc) > 3:
  104. cleaned_documents.append(cleaned_doc)
  105. valid_original_docs.append(doc)
  106. if not cleaned_documents:
  107. return []
  108. # 根据模型名称路由到对应的reranker方法
  109. method_name, log_desc = RERANK_MODEL_ROUTE.get(
  110. self.rerank_model_type,
  111. ('bge_rerank', '默认 BGE Reranker')
  112. )
  113. self.logger.info(f"使用 {log_desc} ({self.rerank_model_type}) 进行重排序")
  114. rerank_results = getattr(rerank_model, method_name)(query_text, cleaned_documents, top_k)
  115. # 将清理后的文本映射回原始文本(所有reranker都需要)
  116. for result in rerank_results:
  117. cleaned_text = result.get('text', '')
  118. # 查找原始文本
  119. for i, cleaned in enumerate(cleaned_documents):
  120. if cleaned == cleaned_text:
  121. result['text'] = valid_original_docs[i]
  122. break
  123. # 统一字段名:将 relevance_score 转换为 score
  124. if 'relevance_score' in result and 'score' not in result:
  125. result['score'] = float(result['relevance_score'])
  126. return rerank_results
  127. except Exception as e:
  128. self.logger.error(f"重排序失败,模型类型: {self.rerank_model_type}, 错误: {str(e)}")
  129. # 返回原始顺序作为fallback
  130. return [{"text": doc, "score": 0.0} for i, doc in enumerate(documents[:top_k])]
  131. @track_execution_time
  132. async def entity_recall(self, main_entity: str, assisted_search_entity: list,
  133. recall_top_k: int = 5, max_results: int = None) -> List[str]:
  134. """
  135. 执行实体召回
  136. Args:
  137. main_entity: 主查询实体
  138. assisted_search_entity: 辅助搜索实体列表
  139. recall_top_k: 每次单实体召回返回的数量(默认5)
  140. max_results: 最终返回的最大数量,如果为None则返回所有召回结果(默认None)
  141. Returns:
  142. List[str]: 实体文本内容列表
  143. Note:
  144. 实际返回数量 = min(max_results, 主实体召回数 + 所有辅助实体召回数)
  145. 如果不设置max_results,可能返回较多结果(取决于辅助实体数量)
  146. """
  147. self.logger.info(f"[entity_recall] 开始召回, recall_top_k={recall_top_k}, max_results={max_results}, 主实体='{main_entity}', 辅助实体数量={len(assisted_search_entity)}")
  148. collection_name = config_handler.get('rag_collections', 'ENTITY_COLLECTION', 'first_bfp_collection_entity')
  149. # 主实体搜索 - 使用异步方法
  150. entity_result = await self.async_multi_stage_recall(
  151. collection_name=collection_name,
  152. query_text=main_entity,
  153. hybrid_top_k=10, # 降低召回数量,减少搜索耗时和上下文量
  154. top_k=recall_top_k
  155. )
  156. self.logger.info(f"[entity_recall] 主实体召回完成, 返回 {len(entity_result)} 个结果")
  157. assist_tasks = [
  158. self.async_multi_stage_recall(
  159. collection_name=collection_name,
  160. query_text=assisted_search_entity,
  161. hybrid_top_k=10, # 降低召回数量,减少搜索耗时和上下文量
  162. top_k=recall_top_k
  163. ) for assisted_search_entity in assisted_search_entity
  164. ]
  165. # 辅助搜索,异步并发
  166. assist_results_list = await asyncio.gather(*assist_tasks,return_exceptions=True)
  167. assist_results = []
  168. for res in assist_results_list:
  169. if isinstance(res, Exception):
  170. self.logger.error(f"辅助实体召回失败: {str(res)}")
  171. else:
  172. assist_results.extend(res)
  173. all_results = entity_result + assist_results
  174. # if self.rerank_model_type == 'silicoflow_rerank_model':
  175. # with open("temp\entity_bfp_recall\silicoflow_rerank_model.json", "w", encoding="utf-8") as f:
  176. # json.dump(all_results, f, ensure_ascii=False, indent=4)
  177. # elif self.rerank_model_type == 'lq_rerank_model':
  178. # with open("temp\entity_bfp_recall\lq_rerank_model.json", "w", encoding="utf-8") as f:
  179. # json.dump(all_results, f, ensure_ascii=False, indent=4)
  180. # 去重并提取文本内容
  181. entity_list = list(set([item['text_content'] for item in all_results]))
  182. # 如果设置了max_results,进行截断
  183. if max_results is not None and len(entity_list) > max_results:
  184. entity_list = entity_list[:max_results]
  185. self.logger.info(f"[entity_recall] 结果截断到 max_results={max_results}")
  186. self.logger.info(f"entity_list_len:{len(entity_list)}")
  187. return entity_list
  188. @track_execution_time
  189. async def async_bfp_recall(self, entity_list: List[str],background: str ,
  190. top_k: int = 3,) -> List[Dict[str, Any]]:
  191. """
  192. 混合搜索召回 - 向量+BM25召回
  193. Args:
  194. entity_list: 实体列表
  195. background: 背景/上下文信息,用于二次重排
  196. top_k: 返回结果数量
  197. """
  198. import time
  199. start_time = time.time()
  200. self.logger.info(f"[async_bfp_recall] 开始召回, top_k={top_k}, 实体数量={len(entity_list)}, 背景='{background[:50]}...'")
  201. # 异步并发召回编制依据
  202. collection_name = config_handler.get('rag_collections', 'CHILDREN_COLLECTION', 'rag_children_hybrid')
  203. gather_start = time.time()
  204. # 优化:降低hybrid_top_k参数从50到20,减少混合搜索时间
  205. bfp_tasks = [
  206. self.async_multi_stage_recall(
  207. collection_name=collection_name,
  208. query_text=entity,
  209. hybrid_top_k=10, # 从50降到20,减少60%的混合搜索时间
  210. top_k=top_k
  211. ) for entity in entity_list
  212. ]
  213. bfp_tasks_list = await asyncio.gather(*bfp_tasks,return_exceptions=True)
  214. gather_end = time.time()
  215. bfp_results = []
  216. for res in bfp_tasks_list:
  217. if isinstance(res, Exception):
  218. self.logger.error(f"辅助实体召回失败: {str(res)}")
  219. else:
  220. bfp_results.extend(res)
  221. self.logger.info(f"[async_bfp_recall] 第一阶段召回完成, 共召回 {len(bfp_results)} 个文档")
  222. # BFP召回结果已经通过multi_stage_recall进行了重排序,保持原有顺序
  223. # 只对第一次重排序得分大于0.6的文档进行二次重排序
  224. high_score_results = [item for item in bfp_results if (item.get('rerank_score') or 0) > 0.6]
  225. low_score_results = [item for item in bfp_results if (item.get('rerank_score') or 0) <= 0.6]
  226. self.logger.info(f"筛选结果:高分文档(>0.6) {len(high_score_results)} 个,低分文档(≤0.6) {len(low_score_results)} 个")
  227. # 如果没有高分文档,直接返回top_k个结果(按hybrid_similarity排序)
  228. if not high_score_results:
  229. self.logger.info(f"没有得分大于0.8的文档,跳过二次重排序,返回top_k={top_k}个结果(按hybrid_similarity排序)")
  230. # 按 hybrid_similarity 降序排序,返回 top_k 个
  231. sorted_results = sorted(bfp_results, key=lambda x: x.get('hybrid_similarity') or 0, reverse=True)
  232. return sorted_results[:top_k]
  233. # 检查background是否为空,如果为空则跳过二次重排序
  234. if not background or not background.strip():
  235. self.logger.warning("background为空,跳过二次重排序,直接返回高分文档")
  236. return high_score_results
  237. # 提取高分文档的文本内容用于二次重排(保持顺序去重)
  238. seen_texts = set()
  239. high_score_text_content = []
  240. for item in high_score_results:
  241. text = item['text_content']
  242. if text not in seen_texts:
  243. seen_texts.add(text)
  244. high_score_text_content.append(text)
  245. self.logger.info(f"提取高分文档文本内容,共 {len(high_score_text_content)} 个,准备二次重排")
  246. # 二次重排 - 使用配置的重排序模型
  247. rerank_start = time.time()
  248. # 使用传入的 top_k 参数,而不是硬编码为5
  249. bfp_rerank_result = self._get_rerank_results(background, high_score_text_content, top_k)
  250. rerank_end = time.time()
  251. self.logger.info(f"二次重排序耗时: {rerank_end - rerank_start:.3f}秒, top_k={top_k}")
  252. # 根据重排结果重新组织数据
  253. reorganize_start = time.time()
  254. final_results = []
  255. # 构建 text_content -> 原始文档列表 的映射(保留所有匹配项)
  256. text_to_items = {}
  257. for item in high_score_results:
  258. text = item['text_content']
  259. if text not in text_to_items:
  260. text_to_items[text] = []
  261. text_to_items[text].append(item)
  262. # 处理二次重排序的高分文档
  263. added_texts = set() # 用于跟踪已添加的文本,避免重复
  264. for rerank_item in bfp_rerank_result:
  265. text = rerank_item.get('text', '')
  266. parent_id = rerank_item.get('parent_id', '')
  267. score = rerank_item.get('score', 0.0)
  268. if text in text_to_items and text not in added_texts:
  269. # 获取该文本的所有候选文档,选择 rerank_score 最高的
  270. candidates = text_to_items[text]
  271. best_candidate = max(candidates, key=lambda x: x.get('rerank_score', 0.0))
  272. result_item = best_candidate.copy()
  273. result_item['bfp_rerank_score'] = score
  274. result_item['bfp_rerank_parent_id'] = parent_id
  275. final_results.append(result_item)
  276. added_texts.add(text) # 标记该文本已添加
  277. reorganize_end = time.time()
  278. total_time = reorganize_end - start_time
  279. self.logger.info(f"结果重组耗时: {reorganize_end - reorganize_start:.3f}秒")
  280. self.logger.info(f"二次重排完成,返回 {len(final_results)} 个高分文档(top_k={top_k}),丢弃 {len(low_score_results)} 个低分文档")
  281. self.logger.info(f"[async_bfp_recall] 总耗时: {total_time:.3f}秒 (召回: {gather_end-gather_start:.3f}s + 重排: {rerank_end-rerank_start:.3f}s + 其他: {total_time-(gather_end-gather_start)-(rerank_end-rerank_start):.3f}s)")
  282. return final_results
  283. def hybrid_search_recall(self, collection_name: str, query_text: str,
  284. top_k: int = 10 , ranker_type: str = "weighted",
  285. dense_weight: float = 0.7, sparse_weight: float = 0.3) -> List[Dict[str, Any]]:
  286. """
  287. 混合搜索召回 - 向量+BM25召回
  288. Args:
  289. collection_name: 集合名称
  290. query_text: 查询文本
  291. top_k: 返回结果数量
  292. ranker_type: 重排序类型 "weighted" 或 "rrf"
  293. dense_weight: 密集向量权重
  294. sparse_weight: 稀疏向量权重
  295. Returns:
  296. List[Dict]: 搜索结果列表
  297. """
  298. try:
  299. self.logger.info(f"开始混合检索")
  300. param = {'collection_name': collection_name}
  301. # 直接调用同步的混合搜索(在同步方法中)
  302. results = self.vector_manager.hybrid_search(
  303. param=param,
  304. query_text=query_text,
  305. top_k=top_k,
  306. ranker_type=ranker_type,
  307. dense_weight=dense_weight,
  308. sparse_weight=sparse_weight
  309. )
  310. # 详细记录混合搜索结果
  311. self.logger.info(f"混合搜索召回返回 {len(results)} 个结果")
  312. return results
  313. except Exception as e:
  314. self.logger.error(f"混合搜索召回失败: {str(e)}")
  315. return []
  316. def rerank_recall(self, candidates_with_metadata: List[Dict[str, Any]], query_text: str,
  317. top_k: int = None ) -> List[Dict[str, Any]]:
  318. """
  319. 重排序召回 - 使用配置的重排序模型对候选文档重新排序
  320. Args:
  321. candidates_with_metadata: 候选文档列表,包含文本内容和元数据
  322. query_text: 查询文本
  323. top_k: 返回结果数量
  324. Returns:
  325. List[Dict]: 重排序后的结果列表,包含原始索引信息
  326. """
  327. try:
  328. # 第一步:基于文本内容+元数据的组合去重
  329. unique_candidates = []
  330. original_indices_map = [] # 记录每个去重后的候选文档对应的原始索引列表
  331. unique_combinations = set() # 记录已见过的文本+元数据组合
  332. for original_index, candidate in enumerate(candidates_with_metadata):
  333. text_content = candidate.get('text_content', '')
  334. metadata = candidate.get('metadata', {})
  335. # 处理嵌套的metadata字符串
  336. title = ''
  337. file = ''
  338. if 'metadata' in metadata and isinstance(metadata['metadata'], str):
  339. import json
  340. try:
  341. # 解析JSON格式的metadata
  342. inner_metadata = json.loads(metadata['metadata'])
  343. title = inner_metadata.get('title', '')
  344. file = inner_metadata.get('file', '')
  345. except (json.JSONDecodeError, TypeError):
  346. pass
  347. else:
  348. title = metadata.get('title', '')
  349. file = metadata.get('file', '')
  350. # 创建组合键:文本内容 + 关键元数据
  351. combination_key = (text_content, title, file)
  352. if combination_key not in unique_combinations:
  353. # 新的唯一组合
  354. unique_candidates.append(candidate)
  355. original_indices_map.append([original_index])
  356. unique_combinations.add(combination_key)
  357. else:
  358. # 找到对应的唯一候选并添加索引
  359. for unique_idx, unique_candidate in enumerate(unique_candidates):
  360. if unique_candidate.get('text_content', '') == text_content:
  361. # 解析唯一候选的元数据
  362. unique_metadata = unique_candidate.get('metadata', {})
  363. unique_title = ''
  364. unique_file = ''
  365. if 'metadata' in unique_metadata and isinstance(unique_metadata['metadata'], str):
  366. import json
  367. try:
  368. inner_metadata = json.loads(unique_metadata['metadata'])
  369. unique_title = inner_metadata.get('title', '')
  370. unique_file = inner_metadata.get('file', '')
  371. except (json.JSONDecodeError, TypeError):
  372. pass
  373. else:
  374. unique_title = unique_metadata.get('title', '')
  375. unique_file = unique_metadata.get('file', '')
  376. if unique_title == title and unique_file == file:
  377. original_indices_map[unique_idx].append(original_index)
  378. break
  379. # 提取唯一候选文档的文本内容用于重排序
  380. unique_texts = [candidate.get('text_content', '') for candidate in unique_candidates]
  381. # 使用配置的重排序模型进行重排序
  382. rerank_results = self._get_rerank_results(query_text, unique_texts, top_k)
  383. # 转换结果格式,使用索引映射来处理原始索引
  384. scored_docs = []
  385. for i, api_result in enumerate(rerank_results):
  386. rerank_text = api_result.get('text', '')
  387. rerank_score = float(api_result.get('score', '0.0'))
  388. # 根据 rerank_text 在 unique_candidates 中查找匹配项
  389. # (rerank 会改变顺序,不能直接用索引 i)
  390. found_index = None
  391. original_candidate = None
  392. for idx, candidate in enumerate(unique_candidates):
  393. if candidate.get('text_content', '') == rerank_text:
  394. found_index = idx
  395. original_candidate = candidate
  396. break
  397. if original_candidate is None:
  398. self.logger.warning(f"[rerank_recall] 未找到匹配的候选文档,跳过: {rerank_text[:50]}...")
  399. continue
  400. # 使用找到的索引获取原始索引映射
  401. original_index = original_indices_map[found_index][0] if found_index < len(original_indices_map) else i
  402. # 获取原始混合搜索的评分信息
  403. hybrid_distance = original_candidate.get('distance', 0.0)
  404. hybrid_similarity = original_candidate.get('similarity', 0.0)
  405. # 解析元数据获取标题用于日志
  406. metadata = original_candidate.get('metadata', {})
  407. title = 'N/A'
  408. if 'metadata' in metadata and isinstance(metadata['metadata'], str):
  409. try:
  410. import json
  411. inner_metadata = json.loads(metadata['metadata'])
  412. title = inner_metadata.get('title', 'N/A')
  413. except:
  414. pass
  415. scored_docs.append({
  416. 'text_content': rerank_text,
  417. 'metadata': original_candidate.get('metadata', {}), # 保留原始元数据
  418. 'rerank_score': rerank_score,
  419. 'original_index': original_index,
  420. 'rerank_rank': i,
  421. 'duplicate_count': len(original_indices_map[i]), # 记录重复数量
  422. 'hybrid_distance': hybrid_distance, # 保留原始混合搜索评分
  423. 'hybrid_similarity': hybrid_similarity
  424. })
  425. return scored_docs
  426. except Exception as e:
  427. self.logger.error(f"重排序召回失败: {str(e)}")
  428. return []
  429. def multi_stage_recall(self, collection_name: str, query_text: str,
  430. hybrid_top_k: int = 50, top_k: int = 10,
  431. ranker_type: str = "weighted") -> List[Dict[str, Any]]:
  432. """
  433. 多路召回 - 先混合搜索召回,再重排序,只返回重排序结果
  434. Args:
  435. collection_name: 集合名称
  436. query_text: 查询文本
  437. hybrid_top_k: 混合搜索召回的文档数量
  438. top_k: 最终返回的文档数量
  439. ranker_type: 混合搜索的重排序类型
  440. Returns:
  441. List[Dict]: 重排序后的结果列表,只包含重排序分数
  442. """
  443. try:
  444. self.logger.info(f"执行多路召回")
  445. # 第一阶段:混合搜索召回(向量+BM25)
  446. hybrid_results = self.hybrid_search_recall(
  447. collection_name=collection_name,
  448. query_text=query_text,
  449. top_k=hybrid_top_k,
  450. ranker_type=ranker_type
  451. )
  452. if not hybrid_results:
  453. self.logger.warning("混合搜索召回无结果,返回空列表")
  454. return []
  455. # 第二阶段:重排序召回,传递完整的混合搜索结果(包含元数据)
  456. rerank_results = self.rerank_recall(
  457. candidates_with_metadata=hybrid_results,
  458. query_text=query_text,
  459. top_k=top_k
  460. )
  461. # 优化重排序结果的元数据结构
  462. final_results = []
  463. for rerank_result in rerank_results:
  464. metadata = rerank_result.get('metadata', {}).copy()
  465. duplicate_count = rerank_result.get('duplicate_count', 1)
  466. # 如果内层有metadata字段,将其提取到外层
  467. if 'metadata' in metadata:
  468. inner_raw = metadata['metadata']
  469. inner_metadata = None
  470. if isinstance(inner_raw, dict):
  471. # Milvus JSON 字段直接返回 dict
  472. inner_metadata = inner_raw
  473. elif isinstance(inner_raw, str):
  474. import json
  475. try:
  476. inner_metadata = json.loads(inner_raw)
  477. except (json.JSONDecodeError, TypeError):
  478. pass
  479. if inner_metadata and isinstance(inner_metadata, dict):
  480. metadata.update(inner_metadata)
  481. del metadata['metadata']
  482. # 移除重复的content字段
  483. if 'content' in metadata:
  484. del metadata['content']
  485. # 添加重复计数信息到元数据中
  486. if duplicate_count > 1:
  487. metadata['duplicate_count'] = duplicate_count
  488. # 输出优化后的结果,包含双重评分
  489. final_result = {
  490. 'text_content': rerank_result['text_content'],
  491. 'metadata': metadata,
  492. 'hybrid_similarity': rerank_result.get('hybrid_similarity', 0.0), # 混合搜索相似度
  493. 'rerank_score': rerank_result.get('rerank_score', 0.0) # BGE重排序评分
  494. }
  495. final_results.append(final_result)
  496. self.logger.debug(f"元数据优化完成: 重排序排名{rerank_result.get('rerank_rank')}, 重复数量={duplicate_count}")
  497. return final_results
  498. except Exception as e:
  499. self.logger.error(f"多路召回失败: {str(e)}")
  500. return []
  501. async def async_multi_stage_recall(self, collection_name: str, query_text: str,
  502. hybrid_top_k: int = 50, top_k: int = 10,
  503. ranker_type: str = "weighted") -> List[Dict[str, Any]]:
  504. """
  505. 多路召回 - 先混合搜索召回,再重排序,只返回重排序结果
  506. Args:
  507. collection_name: 集合名称
  508. query_text: 查询文本
  509. hybrid_top_k: 混合搜索召回的文档数量
  510. top_k: 最终返回的文档数量
  511. ranker_type: 混合搜索的重排序类型
  512. Returns:
  513. List[Dict]: 重排序后的结果列表,只包含重排序分数
  514. """
  515. import time
  516. try:
  517. start_time = time.time()
  518. # 第一阶段:混合搜索召回(向量+BM25)
  519. hybrid_results = await asyncio.to_thread(
  520. self.hybrid_search_recall,
  521. collection_name=collection_name,
  522. query_text=query_text,
  523. top_k=hybrid_top_k,
  524. ranker_type=ranker_type
  525. )
  526. if not hybrid_results:
  527. return []
  528. # 第二阶段:重排序召回
  529. rerank_results = self.rerank_recall(
  530. candidates_with_metadata=hybrid_results,
  531. query_text=query_text,
  532. top_k=top_k
  533. )
  534. # 优化重排序结果的元数据结构
  535. final_results = []
  536. for rerank_result in rerank_results:
  537. metadata = rerank_result.get('metadata', {}).copy()
  538. duplicate_count = rerank_result.get('duplicate_count', 1)
  539. # 如果内层有metadata字段,将其提取到外层
  540. if 'metadata' in metadata:
  541. inner_raw = metadata['metadata']
  542. inner_metadata = None
  543. if isinstance(inner_raw, dict):
  544. # Milvus JSON 字段直接返回 dict
  545. inner_metadata = inner_raw
  546. elif isinstance(inner_raw, str):
  547. import json
  548. try:
  549. inner_metadata = json.loads(inner_raw)
  550. except (json.JSONDecodeError, TypeError):
  551. pass
  552. if inner_metadata and isinstance(inner_metadata, dict):
  553. metadata.update(inner_metadata)
  554. del metadata['metadata']
  555. # 移除重复的content字段
  556. if 'content' in metadata:
  557. del metadata['content']
  558. # 添加重复计数信息到元数据中
  559. if duplicate_count > 1:
  560. metadata['duplicate_count'] = duplicate_count
  561. # 输出优化后的结果,包含双重评分
  562. final_result = {
  563. 'text_content': rerank_result['text_content'],
  564. 'metadata': metadata,
  565. 'hybrid_similarity': rerank_result.get('hybrid_similarity', 0.0), # 混合搜索相似度
  566. 'rerank_score': rerank_result.get('rerank_score', 0.0) # BGE重排序评分
  567. }
  568. final_results.append(final_result)
  569. self.logger.debug(f"元数据优化完成: 重排序排名{rerank_result.get('rerank_rank')}, 重复数量={duplicate_count}")
  570. return final_results
  571. except Exception as e:
  572. self.logger.error(f"多路召回失败: {str(e)}")
  573. return []
  574. # 创建全局召回管理器实例
  575. retrieval_manager = RetrievalManager()