|
|
@@ -8,11 +8,15 @@
|
|
|
支持的重排序模型:
|
|
|
- BGE Reranker (本地部署)
|
|
|
- Qwen3-Reranker-8B (本地部署)
|
|
|
+- Qwen3-Reranker-8B (蜀天算力)
|
|
|
- Qwen3-Reranker-8B (硅基流动API)
|
|
|
+
|
|
|
+配置加载策略: 懒加载(首次调用时从 config.ini 读取该后端的凭证并缓存)
|
|
|
+路由决策: 由 retrieval.py 通过 model_setting.yaml 的 rerank 功能决定使用哪个后端
|
|
|
"""
|
|
|
import json
|
|
|
import requests
|
|
|
-from typing import List, Dict, Any
|
|
|
+from typing import List, Dict, Any, Optional
|
|
|
from foundation.infrastructure.config.config import config_handler
|
|
|
from foundation.observability.logger.loggering import review_logger as server_logger
|
|
|
|
|
|
@@ -20,67 +24,91 @@ from foundation.observability.logger.loggering import review_logger as server_lo
|
|
|
class LqReranker:
|
|
|
"""
|
|
|
重排序执行器
|
|
|
+
|
|
|
+ 各后端配置按需加载:首次调用某后端时才从 config.ini 读取其凭证,
|
|
|
+ 避免初始化时加载所有 4 个后端的配置。
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
- # BGE Reranker 配置
|
|
|
- self.bge_api_url = config_handler.get('bge_rerank_model', 'BGE_RERANKER_SERVER_URL')
|
|
|
- self.bge_model = config_handler.get('bge_rerank_model', 'BGE_RERANKER_MODEL')
|
|
|
- self.bge_top_k = int(config_handler.get('bge_rerank_model', 'BGE_RERANKER_TOP_N', 10))
|
|
|
-
|
|
|
- # 本地Qwen3-Reranker-8B配置
|
|
|
- self.lq_rerank_api_url = config_handler.get('lq_rerank_model', 'LQ_RERANKER_SERVER_URL')
|
|
|
- self.lq_rerank_model = config_handler.get('lq_rerank_model', 'LQ_RERANKER_MODEL')
|
|
|
- self.lq_rerank_top_k = int(config_handler.get('lq_rerank_model', 'LQ_RERANKER_TOP_N', 10))
|
|
|
-
|
|
|
- # SHUTIAN Qwen3-Reranker-8B配置(蜀天云算力 25426端口)
|
|
|
- self.shutian_rerank_api_url = config_handler.get('shutian', 'SHUTIAN_RERANK_SERVER_URL')
|
|
|
- self.shutian_rerank_model = config_handler.get('shutian', 'SHUTIAN_RERANK_MODEL_ID')
|
|
|
- self.shutian_rerank_api_key = config_handler.get('shutian', 'SHUTIAN_RERANK_API_KEY')
|
|
|
-
|
|
|
- # 硅基流动Qwen3-Reranker-8B配置
|
|
|
- self.silicoflow_rerank_api_url = config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_API_URL', 'https://api.siliconflow.cn/v1/rerank')
|
|
|
- self.silicoflow_rerank_api_key = config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_API_KEY')
|
|
|
- self.silicoflow_rerank_model = config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_MODEL', 'Qwen/Qwen3-Reranker-8B')
|
|
|
-
|
|
|
- def bge_rerank(self,query: str, candidates: List[str],top_k :int = None) -> List[Dict[str, Any]]:
|
|
|
+ # 各后端配置缓存(首次调用时加载)
|
|
|
+ self._bge_config: Optional[Dict[str, Any]] = None
|
|
|
+ self._lq_config: Optional[Dict[str, Any]] = None
|
|
|
+ self._shutian_config: Optional[Dict[str, Any]] = None
|
|
|
+ self._silicoflow_config: Optional[Dict[str, Any]] = None
|
|
|
+
|
|
|
+ def _get_bge_config(self) -> Dict[str, Any]:
|
|
|
+ """懒加载 BGE Reranker 配置"""
|
|
|
+ if self._bge_config is None:
|
|
|
+ self._bge_config = {
|
|
|
+ 'api_url': config_handler.get('bge_rerank_model', 'BGE_RERANKER_SERVER_URL'),
|
|
|
+ 'model': config_handler.get('bge_rerank_model', 'BGE_RERANKER_MODEL'),
|
|
|
+ 'top_k': int(config_handler.get('bge_rerank_model', 'BGE_RERANKER_TOP_N', 10)),
|
|
|
+ }
|
|
|
+ return self._bge_config
|
|
|
+
|
|
|
+ def _get_lq_config(self) -> Dict[str, Any]:
|
|
|
+ """懒加载本地 Qwen3-Reranker 配置"""
|
|
|
+ if self._lq_config is None:
|
|
|
+ self._lq_config = {
|
|
|
+ 'api_url': config_handler.get('lq_rerank_model', 'LQ_RERANKER_SERVER_URL'),
|
|
|
+ 'model': config_handler.get('lq_rerank_model', 'LQ_RERANKER_MODEL'),
|
|
|
+ 'top_k': int(config_handler.get('lq_rerank_model', 'LQ_RERANKER_TOP_N', 10)),
|
|
|
+ }
|
|
|
+ return self._lq_config
|
|
|
+
|
|
|
+ def _get_shutian_config(self) -> Dict[str, Any]:
|
|
|
+ """懒加载蜀天 Qwen3-Reranker 配置"""
|
|
|
+ if self._shutian_config is None:
|
|
|
+ self._shutian_config = {
|
|
|
+ 'api_url': config_handler.get('shutian', 'SHUTIAN_RERANK_SERVER_URL'),
|
|
|
+ 'model': config_handler.get('shutian', 'SHUTIAN_RERANK_MODEL_ID'),
|
|
|
+ 'api_key': config_handler.get('shutian', 'SHUTIAN_RERANK_API_KEY'),
|
|
|
+ }
|
|
|
+ return self._shutian_config
|
|
|
+
|
|
|
+ def _get_silicoflow_config(self) -> Dict[str, Any]:
|
|
|
+ """懒加载硅基流动 Qwen3-Reranker 配置"""
|
|
|
+ if self._silicoflow_config is None:
|
|
|
+ self._silicoflow_config = {
|
|
|
+ 'api_url': config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_API_URL',
|
|
|
+ 'https://api.siliconflow.cn/v1/rerank'),
|
|
|
+ 'api_key': config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_API_KEY'),
|
|
|
+ 'model': config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_MODEL',
|
|
|
+ 'Qwen/Qwen3-Reranker-8B'),
|
|
|
+ }
|
|
|
+ return self._silicoflow_config
|
|
|
+
|
|
|
+ def bge_rerank(self, query: str, candidates: List[str], top_k: int = None) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
- 执行重排序的全局函数
|
|
|
+ 使用本地 BGE-reranker-v2-m3 进行重排序
|
|
|
|
|
|
Args:
|
|
|
query: 查询文本
|
|
|
candidates: 候选文档列表
|
|
|
- top_k: 调用时chaurnum参数,默认为None
|
|
|
-
|
|
|
+ top_k: 返回前k个结果,默认使用配置文件的top_k
|
|
|
|
|
|
Returns:
|
|
|
List[Dict]: 重排序后的结果列表
|
|
|
"""
|
|
|
try:
|
|
|
- # self.top_k 是config.ini生产环境中实际使用的重排序数量,bge_rerank中的top_k,用于开发环境中快速效果调试
|
|
|
- if not top_k:# 如果开发top_k未指定,则使用配置文件中的top_k
|
|
|
- top_k = self.bge_top_k
|
|
|
-
|
|
|
+ cfg = self._get_bge_config()
|
|
|
+ if not top_k:
|
|
|
+ top_k = cfg['top_k']
|
|
|
|
|
|
server_logger.info(f"开始执行重排序,查询: '{query}', 候选文档数量: {len(candidates)}")
|
|
|
|
|
|
- # 构建重排序请求
|
|
|
rerank_request = {
|
|
|
- "model": self.bge_model,
|
|
|
+ "model": cfg['model'],
|
|
|
"query": query,
|
|
|
- "candidates": candidates
|
|
|
+ "documents": candidates
|
|
|
}
|
|
|
|
|
|
- # 直接调用重排序API
|
|
|
- url = self.bge_api_url
|
|
|
- headers = {
|
|
|
- "Content-Type": "application/json"
|
|
|
- }
|
|
|
+ headers = {"Content-Type": "application/json"}
|
|
|
|
|
|
- server_logger.debug(f"调用重排序API: {url}")
|
|
|
+ server_logger.debug(f"调用重排序API: {cfg['api_url']}")
|
|
|
server_logger.debug(f"请求数据: {json.dumps(rerank_request, ensure_ascii=False)}")
|
|
|
|
|
|
- response = requests.post(url, headers=headers, json=rerank_request, timeout=30)
|
|
|
+ response = requests.post(cfg['api_url'], headers=headers, json=rerank_request, timeout=30)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
@@ -97,7 +125,6 @@ class LqReranker:
|
|
|
|
|
|
except Exception as e:
|
|
|
server_logger.error(f"执行重排序失败: {str(e)}")
|
|
|
- # 返回原始顺序作为fallback
|
|
|
return [{"text": doc, "score": "0.0"} for doc in candidates[:top_k]]
|
|
|
|
|
|
def lq_rerank(self, query: str, candidates: List[str], top_k: int = None) -> List[Dict[str, Any]]:
|
|
|
@@ -111,28 +138,19 @@ class LqReranker:
|
|
|
|
|
|
Returns:
|
|
|
List[Dict[str, Any]]: 重排序后的结果列表
|
|
|
- [
|
|
|
- {
|
|
|
- "text": str, # 文档文本内容
|
|
|
- "score": float, # 相关性得分
|
|
|
- "index": int # 原始索引
|
|
|
- },
|
|
|
- ...
|
|
|
- ]
|
|
|
"""
|
|
|
try:
|
|
|
+ cfg = self._get_lq_config()
|
|
|
if not top_k:
|
|
|
- top_k = self.lq_rerank_top_k
|
|
|
+ top_k = cfg['top_k']
|
|
|
|
|
|
- # 检查query是否为空
|
|
|
if not query or not query.strip():
|
|
|
server_logger.warning(f"本地Qwen3重排序跳过:query为空")
|
|
|
return [{"text": doc, "score": 0.0} for doc in candidates[:top_k]]
|
|
|
|
|
|
server_logger.info(f"开始执行本地Qwen3重排序,查询: '{query}', 候选文档数量: {len(candidates)}")
|
|
|
|
|
|
- # 定义变量(与测试脚本完全一致)
|
|
|
- url = self.lq_rerank_api_url
|
|
|
+ url = cfg['api_url']
|
|
|
prefix = '<|im_start|>system\nJudge whether the Document meets the requirements based on the Query and the Instruct provided. Note that the answer can only be "yes" or "no".<|im_end|>\n<|im_start|>user\n'
|
|
|
suffix = "<|im_end|>\n<|im_start|>assistant\n<think>\n\n</think>\n\n"
|
|
|
|
|
|
@@ -146,31 +164,24 @@ class LqReranker:
|
|
|
documents = [document_template.format(doc=doc, suffix=suffix) for doc in candidates]
|
|
|
|
|
|
data = {
|
|
|
- "model": self.lq_rerank_model,
|
|
|
+ "model": cfg['model'],
|
|
|
"query": query,
|
|
|
"documents": documents
|
|
|
}
|
|
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
|
|
-
|
|
|
response = requests.post(url, headers=headers, json=data, timeout=30)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
-
|
|
|
|
|
|
if "results" in result:
|
|
|
- # 格式化结果:将嵌套的 document.text 提取到外层,并清理模板标记
|
|
|
formatted_results = []
|
|
|
for item in result["results"]:
|
|
|
- # 获取包含模板的原始文本
|
|
|
raw_text = item.get("document", {}).get("text", "")
|
|
|
|
|
|
- # 清理模板标记:去除 <Document>: 和 <|im_end|>...assistant 之后的内容
|
|
|
- # 文本格式: <Document>: 原始内容<|im_end|>\n<|im_start|>assistant\n...
|
|
|
if "<Document>:" in raw_text:
|
|
|
- # 提取 <Document>: 和 <|im_end|> 之间的内容
|
|
|
start = raw_text.find("<Document>:") + len("<Document>:")
|
|
|
end = raw_text.find("<|im_end|>")
|
|
|
if end > start:
|
|
|
@@ -179,7 +190,7 @@ class LqReranker:
|
|
|
cleaned_text = raw_text[start:].strip()
|
|
|
else:
|
|
|
cleaned_text = raw_text
|
|
|
-
|
|
|
+
|
|
|
formatted_results.append({
|
|
|
"text": cleaned_text,
|
|
|
"score": float(item.get("relevance_score", 0.0)),
|
|
|
@@ -205,8 +216,9 @@ class LqReranker:
|
|
|
接口为标准 OpenAI 兼容 rerank API,无需模板包装,直接传原始 query/documents
|
|
|
"""
|
|
|
try:
|
|
|
+ cfg = self._get_shutian_config()
|
|
|
if not top_k:
|
|
|
- top_k = self.lq_rerank_top_k
|
|
|
+ top_k = self._get_lq_config()['top_k']
|
|
|
|
|
|
if not query or not query.strip():
|
|
|
server_logger.warning("SHUTIAN重排序跳过:query为空")
|
|
|
@@ -215,7 +227,7 @@ class LqReranker:
|
|
|
server_logger.info(f"开始执行SHUTIAN Qwen3重排序,查询: '{query}', 候选文档数量: {len(candidates)}")
|
|
|
|
|
|
data = {
|
|
|
- "model": self.shutian_rerank_model,
|
|
|
+ "model": cfg['model'],
|
|
|
"query": query,
|
|
|
"documents": candidates,
|
|
|
"top_n": top_k
|
|
|
@@ -223,22 +235,20 @@ class LqReranker:
|
|
|
|
|
|
headers = {
|
|
|
"Content-Type": "application/json",
|
|
|
- "Authorization": f"Bearer {self.shutian_rerank_api_key}"
|
|
|
+ "Authorization": f"Bearer {cfg['api_key']}"
|
|
|
}
|
|
|
|
|
|
- response = requests.post(self.shutian_rerank_api_url, headers=headers, json=data, timeout=30)
|
|
|
+ response = requests.post(cfg['api_url'], headers=headers, json=data, timeout=30)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
|
|
|
- # SHUTIAN API直接返回列表: [{"score": x, "document": "文本", "index": 0}, ...]
|
|
|
results_list = result.get("results", result) if isinstance(result, dict) else result
|
|
|
|
|
|
if isinstance(results_list, list) and results_list:
|
|
|
formatted_results = []
|
|
|
for item in results_list:
|
|
|
doc = item.get("document", "")
|
|
|
- # document 可能是字符串或 {"text": "..."} 对象
|
|
|
text = doc if isinstance(doc, str) else doc.get("text", "")
|
|
|
formatted_results.append({
|
|
|
"text": text,
|
|
|
@@ -266,45 +276,42 @@ class LqReranker:
|
|
|
Args:
|
|
|
query: 查询文本
|
|
|
documents: 文档列表
|
|
|
- top_k: 返回前k个结果,默认使用配置文件的top_k
|
|
|
+ top_k: 返回前k个结果,默认10
|
|
|
instruction: 重排序指令
|
|
|
|
|
|
Returns:
|
|
|
List[Dict]: 重排序后的结果列表,包含 text 和 score
|
|
|
"""
|
|
|
try:
|
|
|
+ cfg = self._get_silicoflow_config()
|
|
|
if not top_k:
|
|
|
- top_k = 10 # 默认值
|
|
|
+ top_k = 10
|
|
|
|
|
|
- if not self.silicoflow_rerank_api_key:
|
|
|
+ if not cfg['api_key']:
|
|
|
server_logger.error("硅基流动 Reranker API Key 未配置")
|
|
|
return []
|
|
|
|
|
|
server_logger.info(f"开始执行硅基流动Qwen3重排序,查询: '{query}', 文档数量: {len(documents)}")
|
|
|
|
|
|
-
|
|
|
- # 构建请求数据
|
|
|
request_data = {
|
|
|
- "model": self.silicoflow_rerank_model,
|
|
|
+ "model": cfg['model'],
|
|
|
"query": query,
|
|
|
"documents": documents,
|
|
|
"instruction": instruction,
|
|
|
"top_n": top_k,
|
|
|
"return_documents": True,
|
|
|
- # "max_chunks_per_doc": 123,
|
|
|
- # "overlap_tokens": 79
|
|
|
}
|
|
|
|
|
|
headers = {
|
|
|
- "Authorization": f"Bearer {self.silicoflow_rerank_api_key}",
|
|
|
+ "Authorization": f"Bearer {cfg['api_key']}",
|
|
|
"Content-Type": "application/json"
|
|
|
}
|
|
|
|
|
|
- server_logger.debug(f"调用硅基流动Qwen3 Reranker API: {self.silicoflow_rerank_api_url}")
|
|
|
+ server_logger.debug(f"调用硅基流动Qwen3 Reranker API: {cfg['api_url']}")
|
|
|
server_logger.debug(f"请求数据: {json.dumps(request_data, ensure_ascii=False)}")
|
|
|
|
|
|
response = requests.post(
|
|
|
- self.silicoflow_rerank_api_url,
|
|
|
+ cfg['api_url'],
|
|
|
headers=headers,
|
|
|
json=request_data,
|
|
|
timeout=30
|
|
|
@@ -315,7 +322,6 @@ class LqReranker:
|
|
|
server_logger.debug(f"硅基流动Qwen3 API响应: {json.dumps(result, ensure_ascii=False)}")
|
|
|
|
|
|
if "results" in result:
|
|
|
- # 格式化结果为统一格式
|
|
|
formatted_results = []
|
|
|
for item in result["results"]:
|
|
|
formatted_results.append({
|
|
|
@@ -334,9 +340,7 @@ class LqReranker:
|
|
|
|
|
|
except Exception as e:
|
|
|
server_logger.error(f"执行硅基流动Qwen3重排序失败: {str(e)}")
|
|
|
- # 返回原始顺序作为fallback
|
|
|
return [{"text": doc, "score": 0.0} for doc in documents[:top_k]]
|
|
|
|
|
|
-rerank_model = LqReranker()
|
|
|
-
|
|
|
|
|
|
+rerank_model = LqReranker()
|