#!/usr/bin/env python # -*- coding: utf-8 -*- """ 重排序执行模块 用于调用重排序模型进行文档重排序 支持的重排序模型: - BGE Reranker (本地部署) - Qwen3-Reranker-8B (本地部署) - Qwen3-Reranker-8B (蜀天算力) - Qwen3-Reranker-8B (硅基流动API) 配置加载策略: 懒加载(首次调用时从 config.ini 读取该后端的凭证并缓存) 路由决策: 由 retrieval.py 通过 model_setting.yaml 的 rerank 功能决定使用哪个后端 """ import json import requests from typing import List, Dict, Any, Optional from foundation.infrastructure.config.config import config_handler from foundation.observability.logger.loggering import review_logger as server_logger class LqReranker: """ 重排序执行器 各后端配置按需加载:首次调用某后端时才从 config.ini 读取其凭证, 避免初始化时加载所有 4 个后端的配置。 """ def __init__(self): # 各后端配置缓存(首次调用时加载) self._bge_config: Optional[Dict[str, Any]] = None self._lq_config: Optional[Dict[str, Any]] = None self._shutian_config: Optional[Dict[str, Any]] = None self._silicoflow_config: Optional[Dict[str, Any]] = None def _get_bge_config(self) -> Dict[str, Any]: """懒加载 BGE Reranker 配置""" if self._bge_config is None: self._bge_config = { 'api_url': config_handler.get('bge_rerank_model', 'BGE_RERANKER_SERVER_URL'), 'model': config_handler.get('bge_rerank_model', 'BGE_RERANKER_MODEL'), 'top_k': int(config_handler.get('bge_rerank_model', 'BGE_RERANKER_TOP_N', 10)), } return self._bge_config def _get_lq_config(self) -> Dict[str, Any]: """懒加载本地 Qwen3-Reranker 配置""" if self._lq_config is None: self._lq_config = { 'api_url': config_handler.get('lq_rerank_model', 'LQ_RERANKER_SERVER_URL'), 'model': config_handler.get('lq_rerank_model', 'LQ_RERANKER_MODEL'), 'top_k': int(config_handler.get('lq_rerank_model', 'LQ_RERANKER_TOP_N', 10)), } return self._lq_config def _get_shutian_config(self) -> Dict[str, Any]: """懒加载蜀天 Qwen3-Reranker 配置""" if self._shutian_config is None: self._shutian_config = { 'api_url': config_handler.get('shutian', 'SHUTIAN_RERANK_SERVER_URL'), 'model': config_handler.get('shutian', 'SHUTIAN_RERANK_MODEL_ID'), 'api_key': config_handler.get('shutian', 'SHUTIAN_RERANK_API_KEY'), } return self._shutian_config def _get_silicoflow_config(self) -> Dict[str, Any]: """懒加载硅基流动 Qwen3-Reranker 配置""" if self._silicoflow_config is None: self._silicoflow_config = { 'api_url': config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_API_URL', 'https://api.siliconflow.cn/v1/rerank'), 'api_key': config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_API_KEY'), 'model': config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_MODEL', 'Qwen/Qwen3-Reranker-8B'), } return self._silicoflow_config def bge_rerank(self, query: str, candidates: List[str], top_k: int = None) -> List[Dict[str, Any]]: """ 使用本地 BGE-reranker-v2-m3 进行重排序 Args: query: 查询文本 candidates: 候选文档列表 top_k: 返回前k个结果,默认使用配置文件的top_k Returns: List[Dict]: 重排序后的结果列表 """ try: cfg = self._get_bge_config() if not top_k: top_k = cfg['top_k'] server_logger.info(f"开始执行重排序,查询: '{query}', 候选文档数量: {len(candidates)}") rerank_request = { "model": cfg['model'], "query": query, "documents": candidates } headers = {"Content-Type": "application/json"} server_logger.debug(f"调用重排序API: {cfg['api_url']}") server_logger.debug(f"请求数据: {json.dumps(rerank_request, ensure_ascii=False)}") response = requests.post(cfg['api_url'], headers=headers, json=rerank_request, timeout=30) if response.status_code == 200: result = response.json() server_logger.debug(f"API响应: {json.dumps(result, ensure_ascii=False)}") if "results" in result: return result["results"][:top_k] else: server_logger.warning(f"API响应格式异常: {result}") return [] else: server_logger.error(f"API调用失败,状态码: {response.status_code}, 响应: {response.text}") return [] except Exception as e: server_logger.error(f"执行重排序失败: {str(e)}") return [{"text": doc, "score": "0.0"} for doc in candidates[:top_k]] def lq_rerank(self, query: str, candidates: List[str], top_k: int = None) -> List[Dict[str, Any]]: """ 使用本地部署的 Qwen3-Reranker-8B 进行重排序 Args: query: 查询文本 candidates: 候选文档列表 top_k: 返回前k个结果,默认使用配置文件的top_k Returns: List[Dict[str, Any]]: 重排序后的结果列表 """ try: cfg = self._get_lq_config() if not top_k: top_k = cfg['top_k'] if not query or not query.strip(): server_logger.warning(f"本地Qwen3重排序跳过:query为空") return [{"text": doc, "score": 0.0} for doc in candidates[:top_k]] server_logger.info(f"开始执行本地Qwen3重排序,查询: '{query}', 候选文档数量: {len(candidates)}") url = cfg['api_url'] prefix = '<|im_start|>system\nJudge whether the Document meets the requirements based on the Query and the Instruct provided. Note that the answer can only be "yes" or "no".<|im_end|>\n<|im_start|>user\n' suffix = "<|im_end|>\n<|im_start|>assistant\n\n\n\n\n" query_template = "{prefix}: {instruction}\n: {query}\n" document_template = ": {doc}{suffix}" instruction = ( "请根据桥梁施工建设相关的查询内容,对文档进行重新排序,优先返回与桥梁施工、建设标准、技术规范、质量控制、安全管理等高度相关的文档。" ) query = query_template.format(prefix=prefix, instruction=instruction, query=query) documents = [document_template.format(doc=doc, suffix=suffix) for doc in candidates] data = { "model": cfg['model'], "query": query, "documents": documents } headers = {"Content-Type": "application/json"} response = requests.post(url, headers=headers, json=data, timeout=30) if response.status_code == 200: result = response.json() if "results" in result: formatted_results = [] for item in result["results"]: raw_text = item.get("document", {}).get("text", "") if ":" in raw_text: start = raw_text.find(":") + len(":") end = raw_text.find("<|im_end|>") if end > start: cleaned_text = raw_text[start:end].strip() else: cleaned_text = raw_text[start:].strip() else: cleaned_text = raw_text formatted_results.append({ "text": cleaned_text, "score": float(item.get("relevance_score", 0.0)), "index": item.get("index", 0) }) server_logger.info(f"本地Qwen3 API响应: {formatted_results[:top_k]}") return formatted_results[:top_k] else: server_logger.warning(f"API响应格式异常: {result}") return [] else: server_logger.error(f"API调用失败,状态码: {response.status_code}, 响应: {response.text}") return [] except Exception as e: server_logger.error(f"执行本地Qwen3重排序失败: {str(e)}") return [{"text": doc, "score": 0.0} for doc in candidates[:top_k]] def shutian_rerank(self, query: str, candidates: List[str], top_k: int = None) -> List[Dict[str, Any]]: """ 使用蜀天云算力部署的 Qwen3-Reranker-8B (端口25426) 进行重排序 接口为标准 OpenAI 兼容 rerank API,无需模板包装,直接传原始 query/documents """ try: cfg = self._get_shutian_config() if not top_k: top_k = self._get_lq_config()['top_k'] if not query or not query.strip(): server_logger.warning("SHUTIAN重排序跳过:query为空") return [{"text": doc, "score": 0.0} for doc in candidates[:top_k]] server_logger.info(f"开始执行SHUTIAN Qwen3重排序,查询: '{query}', 候选文档数量: {len(candidates)}") data = { "model": cfg['model'], "query": query, "documents": candidates, "top_n": top_k } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {cfg['api_key']}" } response = requests.post(cfg['api_url'], headers=headers, json=data, timeout=30) if response.status_code == 200: result = response.json() results_list = result.get("results", result) if isinstance(result, dict) else result if isinstance(results_list, list) and results_list: formatted_results = [] for item in results_list: doc = item.get("document", "") text = doc if isinstance(doc, str) else doc.get("text", "") formatted_results.append({ "text": text, "score": float(item.get("relevance_score", item.get("score", 0.0))), "index": item.get("index", 0) }) server_logger.info(f"SHUTIAN Qwen3重排序完成,返回 {len(formatted_results)} 个结果") return formatted_results[:top_k] else: server_logger.warning(f"SHUTIAN API响应格式异常: {result}") return [] else: server_logger.error(f"SHUTIAN API调用失败,状态码: {response.status_code}, 响应: {response.text}") return [] except Exception as e: server_logger.error(f"执行SHUTIAN Qwen3重排序失败: {str(e)}") return [{"text": doc, "score": 0.0} for doc in candidates[:top_k]] def qwen3_rerank(self, query: str, documents: List[str], top_k: int = None, instruction: str = "请根据桥梁施工建设相关的查询内容,对文档进行重新排序,优先返回与桥梁施工、建设标准、技术规范、质量控制、安全管理等高度相关的文档。") -> List[Dict[str, Any]]: """ 使用硅基流动 Qwen3-Reranker-8B API 进行重排序 Args: query: 查询文本 documents: 文档列表 top_k: 返回前k个结果,默认10 instruction: 重排序指令 Returns: List[Dict]: 重排序后的结果列表,包含 text 和 score """ try: cfg = self._get_silicoflow_config() if not top_k: top_k = 10 if not cfg['api_key']: server_logger.error("硅基流动 Reranker API Key 未配置") return [] server_logger.info(f"开始执行硅基流动Qwen3重排序,查询: '{query}', 文档数量: {len(documents)}") request_data = { "model": cfg['model'], "query": query, "documents": documents, "instruction": instruction, "top_n": top_k, "return_documents": True, } headers = { "Authorization": f"Bearer {cfg['api_key']}", "Content-Type": "application/json" } server_logger.debug(f"调用硅基流动Qwen3 Reranker API: {cfg['api_url']}") server_logger.debug(f"请求数据: {json.dumps(request_data, ensure_ascii=False)}") response = requests.post( cfg['api_url'], headers=headers, json=request_data, timeout=30 ) if response.status_code == 200: result = response.json() server_logger.debug(f"硅基流动Qwen3 API响应: {json.dumps(result, ensure_ascii=False)}") if "results" in result: formatted_results = [] for item in result["results"]: formatted_results.append({ "text": item.get("document", {}).get("text", ""), "score": float(item.get("relevance_score", 0.0)), "index": item.get("index", 0) }) return formatted_results[:top_k] else: server_logger.warning(f"API响应格式异常: {result}") return [] else: server_logger.error(f"API调用失败,状态码: {response.status_code}, 响应: {response.text}") return [] except Exception as e: server_logger.error(f"执行硅基流动Qwen3重排序失败: {str(e)}") return [{"text": doc, "score": 0.0} for doc in documents[:top_k]] rerank_model = LqReranker()