retrieval.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from typing import List, Dict, Any, Optional
  2. from foundation.ai.models.rerank_model import rerank_model
  3. from foundation.infrastructure.config.config import config_handler
  4. from foundation.observability.logger.loggering import server_logger
  5. from foundation.database.base.vector.milvus_vector import MilvusVectorManager
  6. class RetrievalManager:
  7. """
  8. 召回管理器,实现多路召回功能
  9. """
  10. def __init__(self):
  11. """
  12. 初始化召回管理器
  13. """
  14. self.vector_manager = MilvusVectorManager()
  15. self.logger = server_logger
  16. self.dense_weight = config_handler.get('hybrid_search', 'DENSE_WEIGHT', 0.7)
  17. self.sparse_weight = config_handler.get('hybrid_search', 'SPARSE_WEIGHT', 0.3)
  18. def hybrid_search_recall(self, collection_name: str, query_text: str,
  19. top_k: int = 10, ranker_type: str = "weighted",
  20. dense_weight: float = 0.7, sparse_weight: float = 0.3) -> List[Dict[str, Any]]:
  21. """
  22. 混合搜索召回 - 向量+BM25召回
  23. Args:
  24. collection_name: 集合名称
  25. query_text: 查询文本
  26. top_k: 返回结果数量
  27. ranker_type: 重排序类型 "weighted" 或 "rrf"
  28. dense_weight: 密集向量权重
  29. sparse_weight: 稀疏向量权重
  30. Returns:
  31. List[Dict]: 搜索结果列表
  32. """
  33. try:
  34. self.logger.info(f"开始混合搜索召回,查询: {query_text}")
  35. param = {'collection_name': collection_name}
  36. results = self.vector_manager.hybrid_search(
  37. param=param,
  38. query_text=query_text,
  39. top_k=top_k,
  40. ranker_type=ranker_type,
  41. dense_weight=dense_weight,
  42. sparse_weight=sparse_weight
  43. )
  44. self.logger.info(f"混合搜索召回返回 {len(results)} 个结果")
  45. return results
  46. except Exception as e:
  47. self.logger.error(f"混合搜索召回失败: {str(e)}")
  48. return []
  49. def rerank_recall(self, candidates: List[str], query_text: str,
  50. top_k: int = None ) -> List[Dict[str, Any]]:
  51. """
  52. 重排序召回 - 使用BGE重排序模型对候选文档重新排序
  53. Args:
  54. candidates: 候选文档列表
  55. query_text: 查询文本
  56. top_k: 返回结果数量
  57. Returns:
  58. List[Dict]: 重排序后的结果列表
  59. """
  60. try:
  61. self.logger.info(f"开始重排序召回,候选文档数量: {len(candidates)}")
  62. # 调用重排序执行器
  63. rerank_results = rerank_model.bge_rerank(query_text, candidates, top_k)
  64. # 转换结果格式
  65. scored_docs = []
  66. for i, result in enumerate(rerank_results):
  67. scored_docs.append({
  68. 'text_content': result.get('text', ''),
  69. 'rerank_score': float(result.get('score', '0.0')),
  70. 'index': i
  71. })
  72. self.logger.info(f"重排序召回返回 {len(scored_docs)} 个结果")
  73. return scored_docs
  74. except Exception as e:
  75. self.logger.error(f"重排序召回失败: {str(e)}")
  76. return []
  77. def multi_stage_recall(self, collection_name: str, query_text: str,
  78. hybrid_top_k: int = 50, top_k: int = 10,
  79. ranker_type: str = "weighted") -> List[Dict[str, Any]]:
  80. """
  81. 多路召回 - 先混合搜索召回,再重排序,只返回重排序结果
  82. Args:
  83. collection_name: 集合名称
  84. query_text: 查询文本
  85. hybrid_top_k: 混合搜索召回的文档数量
  86. top_k: 最终返回的文档数量
  87. ranker_type: 混合搜索的重排序类型
  88. Returns:
  89. List[Dict]: 重排序后的结果列表,只包含重排序分数
  90. """
  91. try:
  92. self.logger.info(f"开始多路召回,查询: {query_text}")
  93. # 第一阶段:混合搜索召回(向量+BM25)
  94. hybrid_results = self.hybrid_search_recall(
  95. collection_name=collection_name,
  96. query_text=query_text,
  97. top_k=hybrid_top_k,
  98. ranker_type=ranker_type
  99. )
  100. if not hybrid_results:
  101. self.logger.warning("混合搜索召回无结果,返回空列表")
  102. return []
  103. # 提取候选文档文本
  104. candidates = [result['text_content'] for result in hybrid_results]
  105. # 第二阶段:重排序召回
  106. rerank_results = self.rerank_recall(
  107. candidates=candidates,
  108. query_text=query_text,
  109. top_k=top_k
  110. )
  111. # 为重排序结果添加混合搜索的原始元数据
  112. for rerank_result in rerank_results:
  113. original_index = rerank_result.get('index', 0)
  114. if original_index < len(hybrid_results):
  115. original_metadata = hybrid_results[original_index].get('metadata', {})
  116. rerank_result['metadata'] = original_metadata
  117. self.logger.info(f"多路召回完成,返回 {len(rerank_results)} 个重排序结果")
  118. return rerank_results
  119. except Exception as e:
  120. self.logger.error(f"多路召回失败: {str(e)}")
  121. return []
  122. # 创建全局召回管理器实例
  123. retrieval_manager = RetrievalManager()