#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
重排序执行模块
用于调用重排序模型进行文档重排序
支持的重排序模型:
- BGE Reranker (本地部署)
- Qwen3-Reranker-8B (本地部署)
- Qwen3-Reranker-8B (硅基流动API)
"""
import json
import requests
from typing import List, Dict, Any
from foundation.infrastructure.config.config import config_handler
from foundation.observability.logger.loggering import server_logger
class LqReranker:
"""
重排序执行器
"""
def __init__(self):
# BGE Reranker 配置
self.bge_api_url = config_handler.get('bge_rerank_model', 'BGE_RERANKER_SERVER_URL')
self.bge_model = config_handler.get('bge_rerank_model', 'BGE_RERANKER_MODEL')
self.bge_top_k = int(config_handler.get('bge_rerank_model', 'BGE_RERANKER_TOP_N', 10))
# 本地Qwen3-Reranker-8B配置
self.lq_rerank_api_url = config_handler.get('lq_rerank_model', 'LQ_RERANKER_SERVER_URL')
self.lq_rerank_model = config_handler.get('lq_rerank_model', 'LQ_RERANKER_MODEL')
self.lq_rerank_top_k = int(config_handler.get('lq_rerank_model', 'LQ_RERANKER_TOP_N', 10))
# 硅基流动Qwen3-Reranker-8B配置
self.silicoflow_rerank_api_url = config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_API_URL', 'https://api.siliconflow.cn/v1/rerank')
self.silicoflow_rerank_api_key = config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_API_KEY')
self.silicoflow_rerank_model = config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_MODEL', 'Qwen/Qwen3-Reranker-8B')
def bge_rerank(self,query: str, candidates: List[str],top_k :int = None) -> List[Dict[str, Any]]:
"""
执行重排序的全局函数
Args:
query: 查询文本
candidates: 候选文档列表
top_k: 调用时chaurnum参数,默认为None
Returns:
List[Dict]: 重排序后的结果列表
"""
try:
# self.top_k 是config.ini生产环境中实际使用的重排序数量,bge_rerank中的top_k,用于开发环境中快速效果调试
if not top_k:# 如果开发top_k未指定,则使用配置文件中的top_k
top_k = self.bge_top_k
server_logger.info(f"开始执行重排序,查询: '{query}', 候选文档数量: {len(candidates)}")
# 构建重排序请求
rerank_request = {
"model": self.bge_model,
"query": query,
"candidates": candidates
}
# 直接调用重排序API
url = self.bge_api_url
headers = {
"Content-Type": "application/json"
}
server_logger.debug(f"调用重排序API: {url}")
server_logger.debug(f"请求数据: {json.dumps(rerank_request, ensure_ascii=False)}")
response = requests.post(url, headers=headers, json=rerank_request, timeout=30)
if response.status_code == 200:
result = response.json()
server_logger.debug(f"API响应: {json.dumps(result, ensure_ascii=False)}")
if "results" in result:
return result["results"][:top_k]
else:
server_logger.warning(f"API响应格式异常: {result}")
return []
else:
server_logger.error(f"API调用失败,状态码: {response.status_code}, 响应: {response.text}")
return []
except Exception as e:
server_logger.error(f"执行重排序失败: {str(e)}")
# 返回原始顺序作为fallback
return [{"text": doc, "score": "0.0"} for doc in candidates[:top_k]]
def lq_rerank(self, query: str, candidates: List[str], top_k: int = None) -> List[Dict[str, Any]]:
"""
使用本地部署的 Qwen3-Reranker-8B 进行重排序
Args:
query: 查询文本
candidates: 候选文档列表
top_k: 返回前k个结果,默认使用配置文件的top_k
Returns:
List[Dict[str, Any]]: 重排序后的结果列表
[
{
"text": str, # 文档文本内容
"score": float, # 相关性得分
"index": int # 原始索引
},
...
]
"""
try:
if not top_k:
top_k = self.lq_rerank_top_k
# 检查query是否为空
if not query or not query.strip():
server_logger.warning(f"本地Qwen3重排序跳过:query为空")
return [{"text": doc, "score": 0.0} for doc in candidates[:top_k]]
server_logger.info(f"开始执行本地Qwen3重排序,查询: '{query}', 候选文档数量: {len(candidates)}")
# 定义变量(与测试脚本完全一致)
url = self.lq_rerank_api_url
prefix = '<|im_start|>system\nJudge whether the Document meets the requirements based on the Query and the Instruct provided. Note that the answer can only be "yes" or "no".<|im_end|>\n<|im_start|>user\n'
suffix = "<|im_end|>\n<|im_start|>assistant\n\n\n\n\n"
query_template = "{prefix}: {instruction}\n: {query}\n"
document_template = ": {doc}{suffix}"
instruction = (
"请根据桥梁施工建设相关的查询内容,对文档进行重新排序,优先返回与桥梁施工、建设标准、技术规范、质量控制、安全管理等高度相关的文档。"
)
query = query_template.format(prefix=prefix, instruction=instruction, query=query)
documents = [document_template.format(doc=doc, suffix=suffix) for doc in candidates]
data = {
"model": self.lq_rerank_model,
"query": query,
"documents": documents
}
headers = {"Content-Type": "application/json"}
response = requests.post(url, headers=headers, json=data, timeout=30)
if response.status_code == 200:
result = response.json()
if "results" in result:
# 格式化结果:将嵌套的 document.text 提取到外层,并清理模板标记
formatted_results = []
for item in result["results"]:
# 获取包含模板的原始文本
raw_text = item.get("document", {}).get("text", "")
# 清理模板标记:去除 : 和 <|im_end|>...assistant 之后的内容
# 文本格式: : 原始内容<|im_end|>\n<|im_start|>assistant\n...
if ":" in raw_text:
# 提取 : 和 <|im_end|> 之间的内容
start = raw_text.find(":") + len(":")
end = raw_text.find("<|im_end|>")
if end > start:
cleaned_text = raw_text[start:end].strip()
else:
cleaned_text = raw_text[start:].strip()
else:
cleaned_text = raw_text
formatted_results.append({
"text": cleaned_text,
"score": float(item.get("relevance_score", 0.0)),
"index": item.get("index", 0)
})
server_logger.info(f"本地Qwen3 API响应: {formatted_results[:top_k]}")
return formatted_results[:top_k]
else:
server_logger.warning(f"API响应格式异常: {result}")
return []
else:
server_logger.error(f"API调用失败,状态码: {response.status_code}, 响应: {response.text}")
return []
except Exception as e:
server_logger.error(f"执行本地Qwen3重排序失败: {str(e)}")
return [{"text": doc, "score": 0.0} for doc in candidates[:top_k]]
def qwen3_rerank(self, query: str, documents: List[str], top_k: int = None,
instruction: str = "请根据桥梁施工建设相关的查询内容,对文档进行重新排序,优先返回与桥梁施工、建设标准、技术规范、质量控制、安全管理等高度相关的文档。") -> List[Dict[str, Any]]:
"""
使用硅基流动 Qwen3-Reranker-8B API 进行重排序
Args:
query: 查询文本
documents: 文档列表
top_k: 返回前k个结果,默认使用配置文件的top_k
instruction: 重排序指令
Returns:
List[Dict]: 重排序后的结果列表,包含 text 和 score
"""
try:
if not top_k:
top_k = 10 # 默认值
if not self.silicoflow_rerank_api_key:
server_logger.error("硅基流动 Reranker API Key 未配置")
return []
server_logger.info(f"开始执行硅基流动Qwen3重排序,查询: '{query}', 文档数量: {len(documents)}")
# 构建请求数据
request_data = {
"model": self.silicoflow_rerank_model,
"query": query,
"documents": documents,
"instruction": instruction,
"top_n": top_k,
"return_documents": True,
# "max_chunks_per_doc": 123,
# "overlap_tokens": 79
}
headers = {
"Authorization": f"Bearer {self.silicoflow_rerank_api_key}",
"Content-Type": "application/json"
}
server_logger.debug(f"调用硅基流动Qwen3 Reranker API: {self.silicoflow_rerank_api_url}")
server_logger.debug(f"请求数据: {json.dumps(request_data, ensure_ascii=False)}")
response = requests.post(
self.silicoflow_rerank_api_url,
headers=headers,
json=request_data,
timeout=30
)
if response.status_code == 200:
result = response.json()
server_logger.debug(f"硅基流动Qwen3 API响应: {json.dumps(result, ensure_ascii=False)}")
if "results" in result:
# 格式化结果为统一格式
formatted_results = []
for item in result["results"]:
formatted_results.append({
"text": item.get("document", {}).get("text", ""),
"score": float(item.get("relevance_score", 0.0)),
"index": item.get("index", 0)
})
return formatted_results[:top_k]
else:
server_logger.warning(f"API响应格式异常: {result}")
return []
else:
server_logger.error(f"API调用失败,状态码: {response.status_code}, 响应: {response.text}")
return []
except Exception as e:
server_logger.error(f"执行硅基流动Qwen3重排序失败: {str(e)}")
# 返回原始顺序作为fallback
return [{"text": doc, "score": 0.0} for doc in documents[:top_k]]
rerank_model = LqReranker()