retrieval.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. from typing import List, Dict, Any, Optional
  2. from foundation.ai.models.rerank_model import rerank_model
  3. from foundation.infrastructure.config.config import config_handler
  4. from foundation.observability.logger.loggering import server_logger
  5. from foundation.database.base.vector.milvus_vector import MilvusVectorManager
  6. class RetrievalManager:
  7. """
  8. 召回管理器,实现多路召回功能
  9. """
  10. def __init__(self):
  11. """
  12. 初始化召回管理器
  13. """
  14. self.vector_manager = MilvusVectorManager()
  15. self.logger = server_logger
  16. self.dense_weight = config_handler.get('hybrid_search', 'DENSE_WEIGHT', 0.7)
  17. self.sparse_weight = config_handler.get('hybrid_search', 'SPARSE_WEIGHT', 0.3)
  18. def hybrid_search_recall(self, collection_name: str, query_text: str,
  19. top_k: int = 10, ranker_type: str = "weighted",
  20. dense_weight: float = 0.7, sparse_weight: float = 0.3) -> List[Dict[str, Any]]:
  21. """
  22. 混合搜索召回 - 向量+BM25召回
  23. Args:
  24. collection_name: 集合名称
  25. query_text: 查询文本
  26. top_k: 返回结果数量
  27. ranker_type: 重排序类型 "weighted" 或 "rrf"
  28. dense_weight: 密集向量权重
  29. sparse_weight: 稀疏向量权重
  30. Returns:
  31. List[Dict]: 搜索结果列表
  32. """
  33. try:
  34. self.logger.info(f"开始混合检索")
  35. param = {'collection_name': collection_name}
  36. results = self.vector_manager.hybrid_search(
  37. param=param,
  38. query_text=query_text,
  39. top_k=top_k,
  40. ranker_type=ranker_type,
  41. dense_weight=dense_weight,
  42. sparse_weight=sparse_weight
  43. )
  44. self.logger.info(f"混合搜索召回返回 {len(results)} 个结果")
  45. return results
  46. except Exception as e:
  47. self.logger.error(f"混合搜索召回失败: {str(e)}")
  48. return []
  49. def rerank_recall(self, candidates: List[str], query_text: str,
  50. top_k: int = None ) -> List[Dict[str, Any]]:
  51. """
  52. 重排序召回 - 使用BGE重排序模型对候选文档重新排序
  53. Args:
  54. candidates: 候选文档列表
  55. query_text: 查询文本
  56. top_k: 返回结果数量
  57. Returns:
  58. List[Dict]: 重排序后的结果列表,包含原始索引信息
  59. """
  60. try:
  61. self.logger.info(f"开始重排序召回,候选文档数量: {len(candidates)}")
  62. # 调用重排序执行器
  63. rerank_results = rerank_model.bge_rerank(query_text, candidates, top_k)
  64. # 转换结果格式,通过文本匹配找到正确的原始索引
  65. scored_docs = []
  66. for i, api_result in enumerate(rerank_results):
  67. rerank_text = api_result.get('text', '')
  68. rerank_score = float(api_result.get('score', '0.0'))
  69. # 通过文本匹配找到原始在candidates中的索引
  70. original_index = None
  71. for j, candidate_text in enumerate(candidates):
  72. if candidate_text == rerank_text:
  73. original_index = j
  74. break
  75. if original_index is None:
  76. self.logger.warning(f"无法找到重排序结果的原始索引,文本: {rerank_text[:50]}...")
  77. original_index = i # 回退到当前索引
  78. scored_docs.append({
  79. 'text_content': rerank_text,
  80. 'rerank_score': rerank_score,
  81. 'original_index': original_index, # 正确的原始索引
  82. 'rerank_rank': i # 重排序后的排名
  83. })
  84. self.logger.debug(f"重排序结果 {i}: 原始索引={original_index}, 重排序分数={rerank_score}")
  85. self.logger.info(f"重排序召回返回 {len(scored_docs)} 个结果")
  86. return scored_docs
  87. except Exception as e:
  88. self.logger.error(f"重排序召回失败: {str(e)}")
  89. return []
  90. def multi_stage_recall(self, collection_name: str, query_text: str,
  91. hybrid_top_k: int = 50, top_k: int = 3,
  92. ranker_type: str = "weighted") -> List[Dict[str, Any]]:
  93. """
  94. 多路召回 - 先混合搜索召回,再重排序,只返回重排序结果
  95. Args:
  96. collection_name: 集合名称
  97. query_text: 查询文本
  98. hybrid_top_k: 混合搜索召回的文档数量
  99. top_k: 最终返回的文档数量
  100. ranker_type: 混合搜索的重排序类型
  101. Returns:
  102. List[Dict]: 重排序后的结果列表,只包含重排序分数
  103. """
  104. try:
  105. self.logger.info(f"执行多路召回")
  106. # 第一阶段:混合搜索召回(向量+BM25)
  107. hybrid_results = self.hybrid_search_recall(
  108. collection_name=collection_name,
  109. query_text=query_text,
  110. top_k=hybrid_top_k,
  111. ranker_type=ranker_type
  112. )
  113. if not hybrid_results:
  114. self.logger.warning("混合搜索召回无结果,返回空列表")
  115. return []
  116. # 提取候选文档文本
  117. candidates = [result['text_content'] for result in hybrid_results]
  118. # 第二阶段:重排序召回
  119. rerank_results = self.rerank_recall(
  120. candidates=candidates,
  121. query_text=query_text,
  122. top_k=top_k
  123. )
  124. # 为重排序结果添加混合搜索的原始元数据,只保留text_content和metadata
  125. final_results = []
  126. for rerank_result in rerank_results:
  127. # 使用正确的原始索引进行元数据映射
  128. original_index = rerank_result.get('original_index', 0)
  129. if original_index < len(hybrid_results):
  130. original_metadata = hybrid_results[original_index].get('metadata', {})
  131. # 只输出text_content和metadata
  132. final_result = {
  133. 'text_content': rerank_result['text_content'],
  134. 'metadata': original_metadata
  135. }
  136. final_results.append(final_result)
  137. self.logger.debug(f"元数据映射成功: 重排序排名{rerank_result.get('rerank_rank')} -> 原始索引{original_index}")
  138. else:
  139. self.logger.warning(f"元数据映射失败: 原始索引{original_index}超出范围(0-{len(hybrid_results)-1})")
  140. self.logger.info(f"多路召回完成,返回 {len(final_results)} 个重排序结果")
  141. return final_results
  142. except Exception as e:
  143. self.logger.error(f"多路召回失败: {str(e)}")
  144. return []
  145. # 创建全局召回管理器实例
  146. retrieval_manager = RetrievalManager()