#!/usr/bin/env python # -*- coding: utf-8 -*- """ 重排序执行模块 用于调用重排序模型进行文档重排序 支持的重排序模型: - BGE Reranker (本地部署) - Qwen3-Reranker-8B (硅基流动API) """ import json import requests from typing import List, Dict, Any from foundation.infrastructure.config.config import config_handler from foundation.observability.logger.loggering import server_logger class LqReranker: """ 重排序执行器 """ def __init__(self): self.api_url = config_handler.get('rerank_model', 'BGE_RERANKER_SERVER_RUL') self.model = config_handler.get('rerank_model', 'BGE_RERANKER_MODEL_ID') # 确保top_k是整数类型,避免切片错误 self.top_k = int(config_handler.get('rerank_model', 'BGE_RERANKER_TOP_N', 3)) # Qwen3-Reranker-8B 配置 self.qwen_api_url = config_handler.get('rerank_model_qwen', 'QWEN_RERANKER_API_URL', 'https://api.siliconflow.cn/v1/rerank') self.qwen_api_key = config_handler.get('rerank_model_qwen', 'QWEN_RERANKER_API_KEY') self.qwen_model = config_handler.get('rerank_model_qwen', 'QWEN_RERANKER_MODEL', 'Qwen/Qwen3-Reranker-8B') def bge_rerank(self,query: str, candidates: List[str],top_k :int = None) -> List[Dict[str, Any]]: """ 执行重排序的全局函数 Args: query: 查询文本 candidates: 候选文档列表 top_k: 调用时chaurnum参数,默认为None Returns: List[Dict]: 重排序后的结果列表 """ try: # self.top_k 是config.ini生产环境中实际使用的重排序数量,bge_rerank中的top_k,用于开发环境中快速效果调试 if not top_k:# 如果开发top_k未指定,则使用配置文件中的top_k top_k = self.top_k server_logger.info(f"开始执行重排序,查询: '{query}', 候选文档数量: {len(candidates)}") # 构建重排序请求 rerank_request = { "model": "bge-reranker-v2-m3", "query": query, "candidates": candidates } # 直接调用重排序API url = self.api_url headers = { "Content-Type": "application/json" } server_logger.debug(f"调用重排序API: {url}") server_logger.debug(f"请求数据: {json.dumps(rerank_request, ensure_ascii=False)}") response = requests.post(url, headers=headers, json=rerank_request, timeout=30) if response.status_code == 200: result = response.json() server_logger.debug(f"API响应: {json.dumps(result, ensure_ascii=False)}") if "results" in result: return result["results"][:top_k] else: server_logger.warning(f"API响应格式异常: {result}") return [] else: server_logger.error(f"API调用失败,状态码: {response.status_code}, 响应: {response.text}") return [] except Exception as e: server_logger.error(f"执行重排序失败: {str(e)}") # 返回原始顺序作为fallback return [{"text": doc, "score": "0.0"} for doc in candidates[:top_k]] def qwen3_rerank(self, query: str, documents: List[str], top_k: int = None, instruction: str = "请根据桥梁施工建设相关的查询内容,对文档进行重新排序,优先返回与桥梁施工、建设标准、技术规范、质量控制、安全管理等高度相关的文档。") -> List[Dict[str, Any]]: """ 使用 Qwen3-Reranker-8B 进行重排序 Args: query: 查询文本 documents: 文档列表 top_k: 返回前k个结果,默认使用配置文件的top_k instruction: 重排序指令 Returns: List[Dict]: 重排序后的结果列表,包含 text 和 score """ try: if not top_k: top_k = self.top_k if not self.qwen_api_key: server_logger.error("Qwen Reranker API Key 未配置") return [] server_logger.info(f"开始执行Qwen3重排序,查询: '{query}', 文档数量: {len(documents)}") # 构建请求数据 request_data = { "model": self.qwen_model, "query": query, "documents": documents, "instruction": instruction, "top_n": top_k, "return_documents": True, "max_chunks_per_doc": 123, "overlap_tokens": 79 } headers = { "Authorization": f"Bearer {self.qwen_api_key}", "Content-Type": "application/json" } server_logger.debug(f"调用Qwen3 Reranker API: {self.qwen_api_url}") server_logger.debug(f"请求数据: {json.dumps(request_data, ensure_ascii=False)}") response = requests.post( self.qwen_api_url, headers=headers, json=request_data, timeout=30 ) if response.status_code == 200: result = response.json() server_logger.debug(f"Qwen3 API响应: {json.dumps(result, ensure_ascii=False)}") if "results" in result: # 格式化结果为统一格式 formatted_results = [] for item in result["results"]: formatted_results.append({ "text": item.get("document", {}).get("text", ""), "score": float(item.get("relevance_score", 0.0)), "index": item.get("index", 0) }) return formatted_results[:top_k] else: server_logger.warning(f"Qwen3 API响应格式异常: {result}") return [] else: server_logger.error(f"Qwen3 API调用失败,状态码: {response.status_code}, 响应: {response.text}") return [] except Exception as e: server_logger.error(f"执行Qwen3重排序失败: {str(e)}") # 返回原始顺序作为fallback return [{"text": doc, "score": 0.0, "index": i} for i, doc in enumerate(documents[:top_k])] rerank_model = LqReranker()