Browse Source

v0.0.4-功能优化
- 增加lq_rerank_model
- 实现专业性审查实体召回审查链路

WangXuMing 2 weeks ago
parent
commit
7219237601

+ 1 - 1
README.md

@@ -13,7 +13,7 @@
     - python .\views\construction_review\app.py
 
 
-    
+  
 
 
 

+ 2 - 0
README_test.md

@@ -0,0 +1,2 @@
+# 测试模块运行环境配置
+$env:PYTHONPATH = "D:\wx_work\sichuan_luqiao\LQAgentPlatform"

+ 12 - 3
core/construction_review/component/__init__.py

@@ -4,11 +4,20 @@
 """
 
 from .document_processor import DocumentProcessor
-from .ai_review_engine import AIReviewEngine
-from .report_generator import ReportGenerator
 
 __all__ = [
     'DocumentProcessor',
     'AIReviewEngine',
     'ReportGenerator'
-]
+]
+
+# 延迟导入 AIReviewEngine 和 ReportGenerator,避免在包导入时触发向量库初始化大幅减少模块加载时的依赖
+def __getattr__(name: str):
+    """延迟导入支持(Python 3.7+)"""
+    if name == 'AIReviewEngine':
+        from .ai_review_engine import AIReviewEngine
+        return AIReviewEngine
+    elif name == 'ReportGenerator':
+        from .report_generator import ReportGenerator
+        return ReportGenerator
+    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

+ 105 - 103
core/construction_review/component/ai_review_engine.py

@@ -52,12 +52,17 @@ import asyncio
 from enum import Enum
 from dataclasses import dataclass
 from typing import Dict, List, Any
-from foundation.ai.rag.retrieval.retrieval import retrieval_manager
-from core.base.task_models import TaskFileInfo  # ✅ 导入 TaskFileInfo
+from core.base.task_models import TaskFileInfo  
 from foundation.infrastructure.config.config import config_handler
 from foundation.observability.logger.loggering import server_logger as logger
+from foundation.ai.rag.retrieval.query_rewrite import query_rewrite_manager
+from foundation.ai.rag.retrieval.entities_enhance import entity_enhance
 from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
 from core.construction_review.component.reviewers.outline_reviewer import OutlineReviewer
+
+
+
+
 @dataclass
 class ReviewResult:
     """审查结果"""
@@ -164,89 +169,6 @@ class AIReviewEngine(BaseReviewer):
         return wrapped_check()
     
 
-    async def outline_check(self, trace_id_idx: str, outline_content: Dict[str, Any],
-                                   state:dict,stage_name:str) -> Dict[str, Any]:
-        """
-        大纲审查
-
-        Args:
-            trace_id_idx: 追踪ID索引
-            outline_content: 大纲内容
-            state: 状态
-            stage_name: 阶段名称
-        """
-        logger.info(f"开始大纲审查,trace_id: {trace_id_idx}")
-
-        # 1. 获取整体大纲(1级大纲目录)
-        overall_outline = ""
-
-        # 添加调试信息
-        logger.info(f"outline_content结构: {list(outline_content.keys()) if outline_content else 'None'}")
-        outline_data = outline_content.get('outline', {})
-        logger.info(f"outline_data结构: {list(outline_data.keys()) if outline_data else 'None'}")
-        chapters = outline_data.get('chapters', [])
-        logger.info(f"chapters数量: {len(chapters)}")
-
-        for i, chapter in enumerate(chapters):
-            chapter_title = chapter.get('title', 'N/A')
-            chapter_page = chapter.get('page', 'N/A')
-            logger.info(f"章节{i+1}: {chapter_title} (页码: {chapter_page})")
-            overall_outline += f"{chapter_title} (页码: {chapter_page})\n"
-
-        logger.info(f"生成的overall_outline长度: {len(overall_outline)}")
-        if overall_outline:
-            logger.info(f"overall_outline内容: {overall_outline[:200]}...")
-
-        # 2. 获取大纲各章节及其子目录的详细信息
-        detailed_outline = []
-
-        for chapter in chapters:
-            # 将每个章节作为整体项,包含标题、页码和子目录
-            chapter_content = f"\n{chapter['title']} (页码: {chapter['page']})\n"
-
-            # 添加子目录(如果有)
-            subsections = chapter.get('subsections', [])
-            if subsections:
-                chapter_content += "包含子目录:\n"
-                for subsection in subsections:
-                    indent = "  " * (subsection['level'] - 1)
-                    chapter_content += f"{indent}- {subsection['title']} (页码: {subsection['page']})\n"
-
-            # 将完整章节内容作为一个项添加到列表
-            detailed_outline.append(chapter_content)
-
-
-
-        logger.info(f"提取整体大纲完成{overall_outline}")
-        logger.info(f"提取详细大纲完成{detailed_outline}")
-
-        # 准备审查数据
-        review_data = {
-            'outline_content': outline_content,
-            'overall_outline': overall_outline,
-            'detailed_outline': detailed_outline,
-            'state': state,
-            'stage_name': stage_name
-        }
-
-        # 调用outline_reviewer进行审查
-        try:
-            outline_review_result = await self.outline_reviewer.outline_review(review_data, trace_id_idx, state,stage_name)
-        except Exception as e:
-            logger.warning(f"大纲审查失败,但返回提取结果: {str(e)}")
-            outline_review_result = None
-
-        # 确保目录存在
-        # import os
-        # os.makedirs("temp/outline_result_temp", exist_ok=True)
-
-        # # with open("temp/outline_result_temp/outline_result.json","w",encoding="utf-8") as f:
-        # #     json.dump(outline_review_result,f,ensure_ascii=False,indent=4)
-        # 返回提取的大纲结果和审查结果
-        return {
-            'outline_review_result': outline_review_result
-        }
-
     async def basic_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any],
                                    review_location_label: str,state:str,stage_name:str) -> Dict[str, Any]:
         """
@@ -422,10 +344,13 @@ class AIReviewEngine(BaseReviewer):
         """
         # 向量检索
         query_content = unit_content['content']
-        vector_results = retrieval_manager.multi_stage_recall(self.milvus_collection, query_content)
+        logger.info("构建查询对")
+        query_pairs = query_rewrite_manager.query_extract(query_content)
+        bfp_result_lists =entity_enhance.entities_enhance_retrieval(query_pairs)
 
+        logger.info(f"bfp_result_lists{bfp_result_lists}")
         # 检查是否有检索结果
-        if not vector_results:
+        if not bfp_result_lists:
             logger.warning("RAG检索未返回任何结果")
             return {
                 'vector_search': [],
@@ -434,9 +359,21 @@ class AIReviewEngine(BaseReviewer):
                 'text_content': '',
                 'metadata': {}
             }
-        logger.info(f"RAG检索返回了 {len(vector_results)} 个结果")
-        # 获取第一个结果的信息
-        first_result = vector_results[0]
+        logger.info(f"RAG检索返回了 {len(bfp_result_lists)} 个查询对结果")
+        # 获取第一个查询对的第一个结果
+        first_result_list = bfp_result_lists[0]
+
+        if not first_result_list:
+            logger.warning("第一个查询对无检索结果")
+            return {
+                'vector_search': [],
+                'retrieval_status': 'no_results',
+                'file_name': '',
+                'text_content': '',
+                'metadata': {}
+            }
+
+        first_result = first_result_list[0]
         file_name = first_result['metadata'].get('file_name', 'unknown')
         text_content = first_result['text_content']
 
@@ -446,8 +383,6 @@ class AIReviewEngine(BaseReviewer):
         'metadata': first_result['metadata']
         }
         
-
-
     async def check_grammar(self, trace_id_idx: str, review_content: str, review_references: str,
                           review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
         """
@@ -578,21 +513,88 @@ class AIReviewEngine(BaseReviewer):
         return await self.review("parameter_compliance_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
                                reference_source, review_location_label, state, stage_name)
 
-    # RAG检索增强
-
-    def _calculate_basic_score(self, grammar: Dict, semantic: Dict, completeness: Dict =None) -> float:
+    async def outline_check(self, trace_id_idx: str, outline_content: Dict[str, Any],
+                                   state:dict,stage_name:str) -> Dict[str, Any]:
         """
-        计算基础合规性得分
+        大纲审查
 
         Args:
-            grammar: 语法检查结果
-            semantic: 语义检查结果
-            completeness: 完整性检查结果
-
-        Returns:
-            float: 基础合规性平均得分
+            trace_id_idx: 追踪ID索引
+            outline_content: 大纲内容
+            state: 状态
+            stage_name: 阶段名称
         """
-        return (grammar.get('score', 0) + semantic.get('score', 0) + completeness.get('score', 0)) / 3
+        logger.info(f"开始大纲审查,trace_id: {trace_id_idx}")
+
+        # 1. 获取整体大纲(1级大纲目录)
+        overall_outline = ""
+
+        # 添加调试信息
+        logger.info(f"outline_content结构: {list(outline_content.keys()) if outline_content else 'None'}")
+        outline_data = outline_content.get('outline', {})
+        logger.info(f"outline_data结构: {list(outline_data.keys()) if outline_data else 'None'}")
+        chapters = outline_data.get('chapters', [])
+        logger.info(f"chapters数量: {len(chapters)}")
+
+        for i, chapter in enumerate(chapters):
+            chapter_title = chapter.get('title', 'N/A')
+            chapter_page = chapter.get('page', 'N/A')
+            logger.info(f"章节{i+1}: {chapter_title} (页码: {chapter_page})")
+            overall_outline += f"{chapter_title} (页码: {chapter_page})\n"
+
+        logger.info(f"生成的overall_outline长度: {len(overall_outline)}")
+        if overall_outline:
+            logger.info(f"overall_outline内容: {overall_outline[:200]}...")
+
+        # 2. 获取大纲各章节及其子目录的详细信息
+        detailed_outline = []
+
+        for chapter in chapters:
+            # 将每个章节作为整体项,包含标题、页码和子目录
+            chapter_content = f"\n{chapter['title']} (页码: {chapter['page']})\n"
+
+            # 添加子目录(如果有)
+            subsections = chapter.get('subsections', [])
+            if subsections:
+                chapter_content += "包含子目录:\n"
+                for subsection in subsections:
+                    indent = "  " * (subsection['level'] - 1)
+                    chapter_content += f"{indent}- {subsection['title']} (页码: {subsection['page']})\n"
+
+            # 将完整章节内容作为一个项添加到列表
+            detailed_outline.append(chapter_content)
+
+
+
+        logger.info(f"提取整体大纲完成{overall_outline}")
+        logger.info(f"提取详细大纲完成{detailed_outline}")
+
+        # 准备审查数据
+        review_data = {
+            'outline_content': outline_content,
+            'overall_outline': overall_outline,
+            'detailed_outline': detailed_outline,
+            'state': state,
+            'stage_name': stage_name
+        }
+
+        # 调用outline_reviewer进行审查
+        try:
+            outline_review_result = await self.outline_reviewer.outline_review(review_data, trace_id_idx, state,stage_name)
+        except Exception as e:
+            logger.warning(f"大纲审查失败,但返回提取结果: {str(e)}")
+            outline_review_result = None
+
+        # 确保目录存在
+        # import os
+        # os.makedirs("temp/outline_result_temp", exist_ok=True)
+
+        # # with open("temp/outline_result_temp/outline_result.json","w",encoding="utf-8") as f:
+        # #     json.dump(outline_review_result,f,ensure_ascii=False,indent=4)
+        # 返回提取的大纲结果和审查结果
+        return {
+            'outline_review_result': outline_review_result
+        }
 
     async def prep_basis_review(self, review_data: Dict[str, Any], trace_id: str,
                                 state: dict = None, stage_name: str = None) -> Dict[str, Any]:

+ 2 - 1
core/construction_review/component/reviewers/prompt/query_extract.yaml

@@ -35,10 +35,11 @@ query_extract:
     - 仅输出 JSON 字符串,且使用```json``` 包装。
     - background 尽量提取完整,避免信息丢失。
     - background 务必忠实与原文,不得胡编乱造,且提取完整
+    - 务必遵循<任务> 中的提取数量要求。
 
   user_template: |
     ## 任务
-    从文本中提取 **1-3个** 最关键的实体对象。视信息密度而定,优先关注**高风险工程部位**(如基坑、支架)和**核心受力构件**。
+    从文本中提取 **2个** 最关键的实体对象。视信息密度而定,优先关注**高风险工程部位**(如基坑、支架)和**核心受力构件**。
 
     ## 示例 1 (避免泛化词提取)
     文本: "大型临时设施选用的原材料、构件、扣件和其他重要受力的辅助材料进行质量验收。严禁使用不合格材料。"

+ 3 - 3
data_pipeline/RAG_recall/rag_miluvs/foundation/ai/models/rerank_model.py

@@ -18,10 +18,10 @@ class LqReranker:
     """
 
     def __init__(self):
-        self.api_url = config_handler.get('rerank_model', 'BGE_RERANKER_SERVER_RUL')
-        self.model = config_handler.get('rerank_model', 'BGE_RERANKER_MODEL_ID')
+        self.api_url = config_handler.get('rerank_model', 'LQ_QWEN3_8B_RERANKER_SERVER_URL')
+        self.model = config_handler.get('rerank_model', 'LQ_QWEN3_8B_RERANKER_MODEL')
         # 确保top_k是整数类型,避免切片错误
-        self.top_k = int(config_handler.get('rerank_model', 'BGE_RERANKER_TOP_N', 5))
+        self.top_k = int(config_handler.get('rerank_model', 'LQ_QWEN3_8B_RERANKER_TOP_N', 5))
         
     def bge_rerank(self,query: str, candidates: List[str],top_k :int = None) -> List[Dict[str, Any]]:
         """

+ 535 - 116
foundation/ai/models/model_handler.py

@@ -16,21 +16,46 @@ AI模型处理器
 - lq_qwen3_4b: 本地Qwen3-4B模型
 - qwen_local_14b: 本地Qwen3-14B模型
 - lq_qwen3_8b_emd: 本地Qwen3-Embedding-8B嵌入模型
+- siliconflow_embed: 硅基流动Qwen3-Embedding-8B嵌入模型
 - lq_bge_reranker_v2_m3: 本地BGE-reranker-v2-m3重排序模型
 """
 
+# 禁用 transformers 的深度学习框架检测,避免启动时耗时扫描
+import os
+os.environ["TRANSFORMERS_VERBOSITY"] = "error"
+os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"
 
-
+import requests
 from langchain_openai import ChatOpenAI, OpenAIEmbeddings
 from foundation.infrastructure.config.config import config_handler
 from foundation.observability.logger.loggering import server_logger as logger
 
 
+class ModelConnectionError(Exception):
+    """模型连接错误"""
+    pass
+
+
+class ModelConfigError(Exception):
+    """模型配置错误"""
+    pass
+
+
+class ModelAPIError(Exception):
+    """模型API调用错误"""
+    pass
+
+
 class ModelHandler:
     """
     AI模型处理器类,用于管理多种AI模型的创建和配置
     """
 
+    # 模型连接超时时间配置(秒)
+    CONNECTION_TIMEOUT = 30
+    REQUEST_TIMEOUT = 120
+    MAX_RETRIES = 2
+
     def __init__(self):
         """
         初始化模型处理器
@@ -38,6 +63,71 @@ class ModelHandler:
         加载配置处理器,用于后续读取各种模型的配置信息
         """
         self.config = config_handler
+        self._model_cache = {}  # 模型实例缓存
+
+    def _check_connection(self, base_url: str, api_key: str = None, timeout: int = 5) -> bool:
+        """
+        检查模型服务连接是否可用
+
+        Args:
+            base_url: 模型服务地址
+            api_key: API密钥(可选)
+            timeout: 超时时间(秒)
+
+        Returns:
+            bool: 连接是否可用
+        """
+        try:
+            # 构造健康检查URL
+            health_url = f"{base_url.rstrip('/')}/models"
+
+            headers = {}
+            if api_key and api_key != "dummy":
+                headers["Authorization"] = f"Bearer {api_key}"
+
+            response = requests.get(
+                health_url,
+                headers=headers,
+                timeout=timeout
+            )
+
+            # 200-299 都认为可用
+            return 200 <= response.status_code < 300
+
+        except requests.exceptions.Timeout:
+            logger.warning(f"连接超时: {base_url}")
+            return False
+        except requests.exceptions.ConnectionError as e:
+            logger.warning(f"连接错误: {base_url}, 错误: {e}")
+            return False
+        except Exception as e:
+            logger.warning(f"连接检查异常: {base_url}, 错误: {e}")
+            return False
+
+    def _handle_model_error(self, model_name: str, error: Exception, fallback_model=None):
+        """
+        统一处理模型错误
+
+        Args:
+            model_name: 模型名称
+            error: 异常对象
+            fallback_model: 降级模型实例(可选)
+
+        Returns:
+            降级模型实例,如果不可用则返回None
+        """
+        error_type = type(error).__name__
+        error_msg = str(error)
+
+        logger.error(f"模型初始化失败 [{model_name}]: {error_type} - {error_msg}")
+
+        # 如果提供了降级模型,记录日志并返回
+        if fallback_model:
+            logger.warning(f"使用降级模型: {fallback_model.__class__.__name__}")
+            return fallback_model
+
+        # 如果没有降级模型,返回None让调用方处理
+        return None
 
     def get_models(self):
         """
@@ -54,29 +144,104 @@ class ModelHandler:
         model_type = self.config.get("model", "MODEL_TYPE")
         logger.info(f"正在初始化AI模型,模型类型: {model_type}")
 
-        if model_type == "doubao":
-            model = self._get_doubao_model()
-        elif model_type == "gemini":
-            model = self._get_gemini_model()
-        elif model_type == "qwen":
-            model = self._get_qwen_model()
-        elif model_type == "deepseek":
-            model = self._get_deepseek_model()
-        elif model_type == "lq_qwen3_8b":
-            model = self._get_lq_qwen3_8b_model()
-        elif model_type == "lq_qwen3_8b_lq_lora":
-            model = self._get_lq_qwen3_8b_lora_model()
-        elif model_type == "lq_qwen3_4b":
-            model = self._get_lq_qwen3_4b_model()
-        elif model_type == "qwen_local_14b":
-            model = self._get_qwen_local_14b_model()
-        else:
-            # 默认返回gemini
-            logger.warning(f"未知的模型类型 '{model_type}',使用默认gemini模型")
-            model = self._get_gemini_model()
-
-        logger.info(f"AI模型初始化完成: {model_type}")
-        return model
+        # 检查缓存
+        cache_key = f"chat_{model_type}"
+        if cache_key in self._model_cache:
+            logger.info(f"使用缓存的模型: {model_type}")
+            return self._model_cache[cache_key]
+
+        model = None
+
+        try:
+            if model_type == "doubao":
+                model = self._get_doubao_model()
+            elif model_type == "gemini":
+                model = self._get_gemini_model()
+            elif model_type == "qwen":
+                model = self._get_qwen_model()
+            elif model_type == "deepseek":
+                model = self._get_deepseek_model()
+            elif model_type == "lq_qwen3_8b":
+                model = self._get_lq_qwen3_8b_model()
+            elif model_type == "lq_qwen3_8b_lq_lora":
+                model = self._get_lq_qwen3_8b_lora_model()
+            elif model_type == "lq_qwen3_4b":
+                model = self._get_lq_qwen3_4b_model()
+            elif model_type == "qwen_local_14b":
+                model = self._get_qwen_local_14b_model()
+            else:
+                # 默认返回gemini
+                logger.warning(f"未知的模型类型 '{model_type}',使用默认gemini模型")
+                model = self._get_gemini_model()
+
+            if model:
+                self._model_cache[cache_key] = model
+                logger.info(f"AI模型初始化完成: {model_type}")
+                return model
+            else:
+                raise ModelAPIError(f"模型初始化返回None: {model_type}")
+
+        except Exception as e:
+            logger.error(f"获取模型失败 [{model_type}]: {e}")
+
+            # 尝试使用gemini作为降级方案
+            if model_type != "gemini":
+                logger.info("尝试使用Gemini模型作为降级方案")
+                try:
+                    fallback_model = self._get_gemini_model()
+                    if fallback_model:
+                        self._model_cache[cache_key] = fallback_model
+                        logger.warning(f"已切换到Gemini降级模型")
+                        return fallback_model
+                except Exception as fallback_error:
+                    logger.error(f"降级模型也失败: {fallback_error}")
+
+            # 如果所有模型都失败,抛出异常
+            raise ModelConnectionError(f"无法初始化任何模型服务: {e}")
+
+    def get_embedding_model(self):
+        """
+        获取Embedding模型实例
+
+        Returns:
+            OpenAIEmbeddings: 配置好的Embedding模型实例
+
+        Note:
+            根据配置文件中的EMBEDDING_MODEL_TYPE参数选择对应模型
+            支持的模型类型:lq_qwen3_8b_emd, siliconflow_embed
+            默认返回本地 lq_qwen3_8b_emd 模型
+        """
+        embedding_model_type = self.config.get("model", "EMBEDDING_MODEL_TYPE", "lq_qwen3_8b_emd")
+        logger.info(f"正在初始化Embedding模型,模型类型: {embedding_model_type}")
+
+        # 检查缓存
+        cache_key = f"embed_{embedding_model_type}"
+        if cache_key in self._model_cache:
+            logger.info(f"使用缓存的Embedding模型: {embedding_model_type}")
+            return self._model_cache[cache_key]
+
+        model = None
+
+        try:
+            if embedding_model_type == "siliconflow_embed":
+                model = self._get_siliconflow_embedding_model()
+            elif embedding_model_type == "lq_qwen3_8b_emd":
+                model = self._get_lq_qwen3_8b_emd()
+            else:
+                # 默认返回本地模型
+                logger.warning(f"未知的Embedding模型类型 '{embedding_model_type}',使用默认本地模型")
+                model = self._get_lq_qwen3_8b_emd()
+
+            if model:
+                self._model_cache[cache_key] = model
+                logger.info(f"Embedding模型初始化完成: {embedding_model_type}")
+                return model
+            else:
+                raise ModelAPIError(f"Embedding模型初始化返回None: {embedding_model_type}")
+
+        except Exception as e:
+            logger.error(f"获取Embedding模型失败 [{embedding_model_type}]: {e}")
+            raise ModelConnectionError(f"无法初始化Embedding模型服务: {e}")
 
     def _get_doubao_model(self):
         """
@@ -85,20 +250,47 @@ class ModelHandler:
         Returns:
             ChatOpenAI: 配置好的豆包模型实例
         """
-        doubao_url = self.config.get("doubao", "DOUBAO_SERVER_URL")
-        doubao_model_id = self.config.get("doubao", "DOUBAO_MODEL_ID")
-        doubao_api_key = self.config.get("doubao", "DOUBAO_API_KEY")
-
-        llm = ChatOpenAI(
-            base_url=doubao_url,
-            model=doubao_model_id,
-            api_key=doubao_api_key,
-            temperature=0.7,
-            extra_body={
-                "enable_thinking": False,
-            })
-
-        return llm
+        try:
+            doubao_url = self.config.get("doubao", "DOUBAO_SERVER_URL")
+            doubao_model_id = self.config.get("doubao", "DOUBAO_MODEL_ID")
+            doubao_api_key = self.config.get("doubao", "DOUBAO_API_KEY")
+
+            # 验证配置完整性
+            if not all([doubao_url, doubao_model_id, doubao_api_key]):
+                missing = []
+                if not doubao_url:
+                    missing.append("DOUBAO_SERVER_URL")
+                if not doubao_model_id:
+                    missing.append("DOUBAO_MODEL_ID")
+                if not doubao_api_key:
+                    missing.append("DOUBAO_API_KEY")
+                raise ModelConfigError(f"豆包模型配置不完整,缺少: {', '.join(missing)}")
+
+            # 检查连接
+            if not self._check_connection(doubao_url, doubao_api_key):
+                logger.warning(f"豆包模型服务连接失败: {doubao_url}")
+                raise ModelConnectionError(f"无法连接到豆包模型服务: {doubao_url}")
+
+            llm = ChatOpenAI(
+                base_url=doubao_url,
+                model=doubao_model_id,
+                api_key=doubao_api_key,
+                temperature=0.7,
+                timeout=self.REQUEST_TIMEOUT,
+                extra_body={
+                    "enable_thinking": False,
+                })
+
+            logger.info(f"豆包模型初始化成功: {doubao_model_id}")
+            return llm
+
+        except ModelConfigError:
+            raise
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"豆包模型初始化异常: {e}")
+            return self._handle_model_error("doubao", error)
 
     def _get_qwen_model(self):
         """
@@ -107,20 +299,47 @@ class ModelHandler:
         Returns:
             ChatOpenAI: 配置好的通义千问模型实例
         """
-        qwen_url = self.config.get("qwen", "QWEN_SERVER_URL")
-        qwen_model_id = self.config.get("qwen", "QWEN_MODEL_ID")
-        qwen_api_key = self.config.get("qwen", "QWEN_API_KEY")
-
-        llm = ChatOpenAI(
-            base_url=qwen_url,
-            model=qwen_model_id,
-            api_key=qwen_api_key,
-            temperature=0.7,
-            extra_body={
-                "enable_thinking": False,
-            })
-
-        return llm
+        try:
+            qwen_url = self.config.get("qwen", "QWEN_SERVER_URL")
+            qwen_model_id = self.config.get("qwen", "QWEN_MODEL_ID")
+            qwen_api_key = self.config.get("qwen", "QWEN_API_KEY")
+
+            # 验证配置完整性
+            if not all([qwen_url, qwen_model_id, qwen_api_key]):
+                missing = []
+                if not qwen_url:
+                    missing.append("QWEN_SERVER_URL")
+                if not qwen_model_id:
+                    missing.append("QWEN_MODEL_ID")
+                if not qwen_api_key:
+                    missing.append("QWEN_API_KEY")
+                raise ModelConfigError(f"通义千问模型配置不完整,缺少: {', '.join(missing)}")
+
+            # 检查连接
+            if not self._check_connection(qwen_url, qwen_api_key):
+                logger.warning(f"通义千问模型服务连接失败: {qwen_url}")
+                raise ModelConnectionError(f"无法连接到通义千问模型服务: {qwen_url}")
+
+            llm = ChatOpenAI(
+                base_url=qwen_url,
+                model=qwen_model_id,
+                api_key=qwen_api_key,
+                temperature=0.7,
+                timeout=self.REQUEST_TIMEOUT,
+                extra_body={
+                    "enable_thinking": False,
+                })
+
+            logger.info(f"通义千问模型初始化成功: {qwen_model_id}")
+            return llm
+
+        except ModelConfigError:
+            raise
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"通义千问模型初始化异常: {e}")
+            return self._handle_model_error("qwen", error)
 
     def _get_deepseek_model(self):
         """
@@ -129,20 +348,47 @@ class ModelHandler:
         Returns:
             ChatOpenAI: 配置好的DeepSeek模型实例
         """
-        deepseek_url = self.config.get("deepseek", "DEEPSEEK_SERVER_URL")
-        deepseek_model_id = self.config.get("deepseek", "DEEPSEEK_MODEL_ID")
-        deepseek_api_key = self.config.get("deepseek", "DEEPSEEK_API_KEY")
-
-        llm = ChatOpenAI(
-            base_url=deepseek_url,
-            model=deepseek_model_id,
-            api_key=deepseek_api_key,
-            temperature=0.7,
-            extra_body={
-                "enable_thinking": False,
-            })
-
-        return llm
+        try:
+            deepseek_url = self.config.get("deepseek", "DEEPSEEK_SERVER_URL")
+            deepseek_model_id = self.config.get("deepseek", "DEEPSEEK_MODEL_ID")
+            deepseek_api_key = self.config.get("deepseek", "DEEPSEEK_API_KEY")
+
+            # 验证配置完整性
+            if not all([deepseek_url, deepseek_model_id, deepseek_api_key]):
+                missing = []
+                if not deepseek_url:
+                    missing.append("DEEPSEEK_SERVER_URL")
+                if not deepseek_model_id:
+                    missing.append("DEEPSEEK_MODEL_ID")
+                if not deepseek_api_key:
+                    missing.append("DEEPSEEK_API_KEY")
+                raise ModelConfigError(f"DeepSeek模型配置不完整,缺少: {', '.join(missing)}")
+
+            # 检查连接
+            if not self._check_connection(deepseek_url, deepseek_api_key):
+                logger.warning(f"DeepSeek模型服务连接失败: {deepseek_url}")
+                raise ModelConnectionError(f"无法连接到DeepSeek模型服务: {deepseek_url}")
+
+            llm = ChatOpenAI(
+                base_url=deepseek_url,
+                model=deepseek_model_id,
+                api_key=deepseek_api_key,
+                temperature=0.7,
+                timeout=self.REQUEST_TIMEOUT,
+                extra_body={
+                    "enable_thinking": False,
+                })
+
+            logger.info(f"DeepSeek模型初始化成功: {deepseek_model_id}")
+            return llm
+
+        except ModelConfigError:
+            raise
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"DeepSeek模型初始化异常: {e}")
+            return self._handle_model_error("deepseek", error)
 
     def _get_gemini_model(self):
         """
@@ -151,18 +397,45 @@ class ModelHandler:
         Returns:
             ChatOpenAI: 配置好的Gemini模型实例
         """
-        gemini_url = self.config.get("gemini", "GEMINI_SERVER_URL")
-        gemini_model_id = self.config.get("gemini", "GEMINI_MODEL_ID")
-        gemini_api_key = self.config.get("gemini", "GEMINI_API_KEY")
-
-        llm = ChatOpenAI(
-            base_url=gemini_url,
-            model=gemini_model_id,
-            api_key=gemini_api_key,
-            temperature=0.7,
+        try:
+            gemini_url = self.config.get("gemini", "GEMINI_SERVER_URL")
+            gemini_model_id = self.config.get("gemini", "GEMINI_MODEL_ID")
+            gemini_api_key = self.config.get("gemini", "GEMINI_API_KEY")
+
+            # 验证配置完整性
+            if not all([gemini_url, gemini_model_id, gemini_api_key]):
+                missing = []
+                if not gemini_url:
+                    missing.append("GEMINI_SERVER_URL")
+                if not gemini_model_id:
+                    missing.append("GEMINI_MODEL_ID")
+                if not gemini_api_key:
+                    missing.append("GEMINI_API_KEY")
+                raise ModelConfigError(f"Gemini模型配置不完整,缺少: {', '.join(missing)}")
+
+            # 检查连接
+            if not self._check_connection(gemini_url, gemini_api_key):
+                logger.warning(f"Gemini模型服务连接失败: {gemini_url}")
+                raise ModelConnectionError(f"无法连接到Gemini模型服务: {gemini_url}")
+
+            llm = ChatOpenAI(
+                base_url=gemini_url,
+                model=gemini_model_id,
+                api_key=gemini_api_key,
+                temperature=0.7,
+                timeout=self.REQUEST_TIMEOUT,
             )
 
-        return llm
+            logger.info(f"Gemini模型初始化成功: {gemini_model_id}")
+            return llm
+
+        except ModelConfigError:
+            raise
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"Gemini模型初始化异常: {e}")
+            return self._handle_model_error("gemini", error)
 
     def _get_lq_qwen3_8b_model(self):
         """
@@ -171,14 +444,31 @@ class ModelHandler:
         Returns:
             ChatOpenAI: 配置好的本地Qwen3-8B模型实例
         """
-        llm = ChatOpenAI(
-            base_url="http://192.168.91.253:9002/v1",
-            model="Qwen3-8B",
-            api_key="dummy",  # 本地模型使用虚拟API key
-            temperature=0.7,
-        )
+        try:
+            server_url = "http://192.168.91.253:9002/v1"
+            model_id = "Qwen3-8B"
+
+            # 检查本地服务连接
+            if not self._check_connection(server_url, "dummy", timeout=3):
+                logger.warning(f"本地Qwen3-8B模型服务连接失败: {server_url}")
+                raise ModelConnectionError(f"无法连接到本地Qwen3-8B模型服务: {server_url}")
+
+            llm = ChatOpenAI(
+                base_url=server_url,
+                model=model_id,
+                api_key="dummy",
+                temperature=0.7,
+                timeout=self.REQUEST_TIMEOUT,
+            )
+
+            logger.info(f"本地Qwen3-8B模型初始化成功: {model_id}")
+            return llm
 
-        return llm
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"本地Qwen3-8B模型初始化异常: {e}")
+            return self._handle_model_error("lq_qwen3_8b", error)
 
     def _get_lq_qwen3_8b_lora_model(self):
         """
@@ -187,18 +477,43 @@ class ModelHandler:
         Returns:
             ChatOpenAI: 配置好的本地Qwen3-8B-lq-lora模型实例
         """
-        server_url = self.config.get("lq_qwen3_8B_lora", "LQ_QWEN3_8B_LQ_LORA_SERVER_URL")
-        model_id = self.config.get("lq_qwen3_8B_lora", "LQ_QWEN3_8B_LQ_LORA_MODEL_ID")
-        api_key = self.config.get("lq_qwen3_8B_lora", "LQ_QWEN3_8B_LQ_LORA_API_KEY")
+        try:
+            server_url = self.config.get("lq_qwen3_8B_lora", "LQ_QWEN3_8B_LQ_LORA_SERVER_URL")
+            model_id = self.config.get("lq_qwen3_8B_lora", "LQ_QWEN3_8B_LQ_LORA_MODEL_ID")
+            api_key = self.config.get("lq_qwen3_8B_lora", "LQ_QWEN3_8B_LQ_LORA_API_KEY", "dummy")
+
+            # 验证配置完整性
+            if not all([server_url, model_id]):
+                missing = []
+                if not server_url:
+                    missing.append("LQ_QWEN3_8B_LQ_LORA_SERVER_URL")
+                if not model_id:
+                    missing.append("LQ_QWEN3_8B_LQ_LORA_MODEL_ID")
+                raise ModelConfigError(f"本地Qwen3-8B-lq-lora模型配置不完整,缺少: {', '.join(missing)}")
+
+            # 检查本地服务连接
+            if not self._check_connection(server_url, api_key, timeout=3):
+                logger.warning(f"本地Qwen3-8B-lq-lora模型服务连接失败: {server_url}")
+                raise ModelConnectionError(f"无法连接到本地Qwen3-8B-lq-lora模型服务: {server_url}")
+
+            llm = ChatOpenAI(
+                base_url=server_url,
+                model=model_id,
+                api_key=api_key,
+                temperature=0.7,
+                timeout=self.REQUEST_TIMEOUT,
+            )
 
-        llm = ChatOpenAI(
-            base_url=server_url,
-            model=model_id,
-            api_key=api_key,
-            temperature=0.7,
-        )
+            logger.info(f"本地Qwen3-8B-lq-lora模型初始化成功: {model_id}")
+            return llm
 
-        return llm
+        except ModelConfigError:
+            raise
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"本地Qwen3-8B-lq-lora模型初始化异常: {e}")
+            return self._handle_model_error("lq_qwen3_8b_lora", error)
 
     def _get_lq_qwen3_4b_model(self):
         """
@@ -207,14 +522,31 @@ class ModelHandler:
         Returns:
             ChatOpenAI: 配置好的本地Qwen3-4B模型实例
         """
-        llm = ChatOpenAI(
-            base_url="http://192.168.91.253:9001/v1",
-            model="Qwen3-4B",
-            api_key="dummy",  # 本地模型使用虚拟API key
-            temperature=0.7,
-        )
+        try:
+            server_url = "http://192.168.91.253:9001/v1"
+            model_id = "Qwen3-4B"
+
+            # 检查本地服务连接
+            if not self._check_connection(server_url, "dummy", timeout=3):
+                logger.warning(f"本地Qwen3-4B模型服务连接失败: {server_url}")
+                raise ModelConnectionError(f"无法连接到本地Qwen3-4B模型服务: {server_url}")
+
+            llm = ChatOpenAI(
+                base_url=server_url,
+                model=model_id,
+                api_key="dummy",
+                temperature=0.7,
+                timeout=self.REQUEST_TIMEOUT,
+            )
+
+            logger.info(f"本地Qwen3-4B模型初始化成功: {model_id}")
+            return llm
 
-        return llm
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"本地Qwen3-4B模型初始化异常: {e}")
+            return self._handle_model_error("lq_qwen3_4b", error)
 
     def _get_qwen_local_14b_model(self):
         """
@@ -223,14 +555,31 @@ class ModelHandler:
         Returns:
             ChatOpenAI: 配置好的本地Qwen3-14B模型实例
         """
-        llm = ChatOpenAI(
-            base_url="http://192.168.91.253:9003/v1",
-            model="Qwen3-14B",
-            api_key="dummy",  # 本地模型使用虚拟API key
-            temperature=0.7,
-        )
+        try:
+            server_url = "http://192.168.91.253:9003/v1"
+            model_id = "Qwen3-14B"
+
+            # 检查本地服务连接
+            if not self._check_connection(server_url, "dummy", timeout=3):
+                logger.warning(f"本地Qwen3-14B模型服务连接失败: {server_url}")
+                raise ModelConnectionError(f"无法连接到本地Qwen3-14B模型服务: {server_url}")
+
+            llm = ChatOpenAI(
+                base_url=server_url,
+                model=model_id,
+                api_key="dummy",
+                temperature=0.7,
+                timeout=self.REQUEST_TIMEOUT,
+            )
+
+            logger.info(f"本地Qwen3-14B模型初始化成功: {model_id}")
+            return llm
 
-        return llm
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"本地Qwen3-14B模型初始化异常: {e}")
+            return self._handle_model_error("qwen_local_14b", error)
 
     def _get_lq_qwen3_8b_emd(self):
         """
@@ -239,13 +588,79 @@ class ModelHandler:
         Returns:
             OpenAIEmbeddings: 配置好的本地Qwen3-Embedding-8B嵌入模型实例
         """
-        embeddings = OpenAIEmbeddings(
-            base_url="http://192.168.91.253:9003/v1",
-            model="Qwen3-Embedding-8B",
-            api_key="dummy",  # 本地模型使用虚拟API key
-        )
+        try:
+            server_url = "http://192.168.91.253:9003/v1"
+            model_id = "Qwen3-Embedding-8B"
+
+            # 检查本地服务连接
+            if not self._check_connection(server_url, "dummy", timeout=3):
+                logger.warning(f"本地Qwen3-Embedding-8B模型服务连接失败: {server_url}")
+                raise ModelConnectionError(f"无法连接到本地Qwen3-Embedding-8B模型服务: {server_url}")
+
+            # 使用 langchain_openai 的 OpenAIEmbeddings
+            embeddings = OpenAIEmbeddings(
+                base_url=server_url,
+                model=model_id,
+                api_key="dummy",  # 本地模型使用虚拟API key
+                timeout=self.REQUEST_TIMEOUT,
+            )
+
+            logger.info(f"本地Qwen3-Embedding-8B模型初始化成功: {model_id}")
+            return embeddings
+
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"本地Qwen3-Embedding-8B模型初始化异常: {e}")
+            return self._handle_model_error("lq_qwen3_8b_emd", error)
+
+    def _get_siliconflow_embedding_model(self):
+        """
+        获取硅基流动Qwen3-Embedding-8B嵌入模型
+
+        Returns:
+            OpenAIEmbeddings: 配置好的硅基流动Qwen3-Embedding-8B嵌入模型实例
+        """
+        try:
+            server_url = self.config.get("siliconflow_embed", "SLCF_EMBED_SERVER_URL")
+            api_key = self.config.get("siliconflow_embed", "SLCF_EMBED_API_KEY")
+            model_id = self.config.get("siliconflow_embed", "SLCF_EMBED_MODEL_ID", "Qwen/Qwen3-Embedding-8B")
+            dimensions = self.config.get("siliconflow_embed", "SLCF_EMBED_DIMENSIONS", "4096")
+
+            # 验证配置完整性
+            if not all([server_url, api_key, model_id]):
+                missing = []
+                if not server_url:
+                    missing.append("SLCF_EMBED_SERVER_URL")
+                if not api_key:
+                    missing.append("SLCF_EMBED_API_KEY")
+                if not model_id:
+                    missing.append("SLCF_EMBED_MODEL_ID")
+                raise ModelConfigError(f"硅基流动Embedding模型配置不完整,缺少: {', '.join(missing)}")
+
+            # 检查连接
+            if not self._check_connection(server_url, api_key):
+                logger.warning(f"硅基流动Embedding模型服务连接失败: {server_url}")
+                raise ModelConnectionError(f"无法连接到硅基流动Embedding模型服务: {server_url}")
+
+            # 使用 langchain_openai 的 OpenAIEmbeddings
+            embeddings = OpenAIEmbeddings(
+                base_url=server_url,
+                model=model_id,
+                api_key=api_key,
+                timeout=self.REQUEST_TIMEOUT,
+            )
+
+            logger.info(f"硅基流动Embedding模型初始化成功: {model_id} (dimensions: {dimensions})")
+            return embeddings
 
-        return embeddings
+        except ModelConfigError:
+            raise
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"硅基流动Embedding模型初始化异常: {e}")
+            return self._handle_model_error("siliconflow_embed", error)
     
 
 
@@ -264,6 +679,10 @@ def get_models():
     Note:
         这是一个便捷函数,直接使用全局model_handler实例获取模型
     """
-    llm = model_handler.get_models()
-    # 暂时返回相同的模型作为chat和embed
-    return llm, llm, None
+    try:
+        llm = model_handler.get_models()
+        # 暂时返回相同的模型作为chat和embed
+        return llm, llm, None
+    except Exception as e:
+        logger.error(f"获取模型失败: {e}")
+        raise ModelConnectionError(f"无法获取模型服务: {e}")

+ 138 - 30
foundation/ai/models/rerank_model.py

@@ -7,6 +7,7 @@
 
 支持的重排序模型:
 - BGE Reranker (本地部署)
+- Qwen3-Reranker-8B (本地部署)
 - Qwen3-Reranker-8B (硅基流动API)
 """
 import json
@@ -22,16 +23,21 @@ class LqReranker:
     """
 
     def __init__(self):
-        self.api_url = config_handler.get('rerank_model', 'BGE_RERANKER_SERVER_RUL')
-        self.model = config_handler.get('rerank_model', 'BGE_RERANKER_MODEL_ID')
-        # 确保top_k是整数类型,避免切片错误
-        self.top_k = int(config_handler.get('rerank_model', 'BGE_RERANKER_TOP_N', 3))
-
-        # Qwen3-Reranker-8B 配置
-        self.qwen_api_url = config_handler.get('rerank_model_qwen', 'QWEN_RERANKER_API_URL', 'https://api.siliconflow.cn/v1/rerank')
-        self.qwen_api_key = config_handler.get('rerank_model_qwen', 'QWEN_RERANKER_API_KEY')
-        self.qwen_model = config_handler.get('rerank_model_qwen', 'QWEN_RERANKER_MODEL', 'Qwen/Qwen3-Reranker-8B')
-        
+        # BGE Reranker 配置
+        self.bge_api_url = config_handler.get('bge_rerank_model', 'BGE_RERANKER_SERVER_URL')
+        self.bge_model = config_handler.get('bge_rerank_model', 'BGE_RERANKER_MODEL')
+        self.bge_top_k = int(config_handler.get('bge_rerank_model', 'BGE_RERANKER_TOP_N', 10))
+
+        # 本地Qwen3-Reranker-8B配置
+        self.lq_rerank_api_url = config_handler.get('lq_rerank_model', 'LQ_RERANKER_SERVER_URL')
+        self.lq_rerank_model = config_handler.get('lq_rerank_model', 'LQ_RERANKER_MODEL')
+        self.lq_rerank_top_k = int(config_handler.get('lq_rerank_model', 'LQ_RERANKER_TOP_N', 10))
+
+        # 硅基流动Qwen3-Reranker-8B配置
+        self.silicoflow_rerank_api_url = config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_API_URL', 'https://api.siliconflow.cn/v1/rerank')
+        self.silicoflow_rerank_api_key = config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_API_KEY')
+        self.silicoflow_rerank_model = config_handler.get('silicoflow_rerank_model', 'SILICOFLOW_RERANKER_MODEL', 'Qwen/Qwen3-Reranker-8B')
+
     def bge_rerank(self,query: str, candidates: List[str],top_k :int = None) -> List[Dict[str, Any]]:
         """
         执行重排序的全局函数
@@ -48,20 +54,20 @@ class LqReranker:
         try:
             # self.top_k 是config.ini生产环境中实际使用的重排序数量,bge_rerank中的top_k,用于开发环境中快速效果调试
             if not top_k:# 如果开发top_k未指定,则使用配置文件中的top_k
-                top_k = self.top_k
+                top_k = self.bge_top_k
             
 
             server_logger.info(f"开始执行重排序,查询: '{query}', 候选文档数量: {len(candidates)}")
 
             # 构建重排序请求
             rerank_request = {
-                "model": "bge-reranker-v2-m3",
+                "model": self.bge_model,
                 "query": query,
                 "candidates": candidates
             }
 
             # 直接调用重排序API
-            url = self.api_url
+            url = self.bge_api_url
             headers = {
                 "Content-Type": "application/json"
             }
@@ -89,10 +95,109 @@ class LqReranker:
             # 返回原始顺序作为fallback
             return [{"text": doc, "score": "0.0"} for doc in candidates[:top_k]]
 
+    def lq_rerank(self, query: str, candidates: List[str], top_k: int = None) -> List[Dict[str, Any]]:
+        """
+        使用本地部署的 Qwen3-Reranker-8B 进行重排序
+
+        Args:
+            query: 查询文本
+            candidates: 候选文档列表
+            top_k: 返回前k个结果,默认使用配置文件的top_k
+
+        Returns:
+            List[Dict[str, Any]]: 重排序后的结果列表
+                [
+                    {
+                        "text": str,        # 文档文本内容
+                        "score": float,      # 相关性得分
+                        "index": int         # 原始索引
+                    },
+                    ...
+                ]
+        """
+        try:
+            if not top_k:
+                top_k = self.lq_rerank_top_k
+
+            # 检查query是否为空
+            if not query or not query.strip():
+                server_logger.warning(f"本地Qwen3重排序跳过:query为空")
+                return [{"text": doc, "score": 0.0} for doc in candidates[:top_k]]
+
+            server_logger.info(f"开始执行本地Qwen3重排序,查询: '{query}', 候选文档数量: {len(candidates)}")
+
+            # 定义变量(与测试脚本完全一致)
+            url = self.lq_rerank_api_url
+            prefix = '<|im_start|>system\nJudge whether the Document meets the requirements based on the Query and the Instruct provided. Note that the answer can only be "yes" or "no".<|im_end|>\n<|im_start|>user\n'
+            suffix = "<|im_end|>\n<|im_start|>assistant\n<think>\n\n</think>\n\n"
+
+            query_template = "{prefix}<Instruct>: {instruction}\n<Query>: {query}\n"
+            document_template = "<Document>: {doc}{suffix}"
+
+            instruction = (
+                "请根据桥梁施工建设相关的查询内容,对文档进行重新排序,优先返回与桥梁施工、建设标准、技术规范、质量控制、安全管理等高度相关的文档。"
+            )
+            query = query_template.format(prefix=prefix, instruction=instruction, query=query)
+            documents = [document_template.format(doc=doc, suffix=suffix) for doc in candidates]
+
+            data = {
+                "model": self.lq_rerank_model,
+                "query": query,
+                "documents": documents
+            }
+
+            headers = {"Content-Type": "application/json"}
+
+
+            response = requests.post(url, headers=headers, json=data, timeout=30)
+
+            if response.status_code == 200:
+                result = response.json()
+  
+
+                if "results" in result:
+                    # 格式化结果:将嵌套的 document.text 提取到外层,并清理模板标记
+                    formatted_results = []
+                    for item in result["results"]:
+                        # 获取包含模板的原始文本
+                        raw_text = item.get("document", {}).get("text", "")
+
+                        # 清理模板标记:去除 <Document>: 和 <|im_end|>...assistant 之后的内容
+                        # 文本格式: <Document>: 原始内容<|im_end|>\n<|im_start|>assistant\n...
+                        if "<Document>:" in raw_text:
+                            # 提取 <Document>: 和 <|im_end|> 之间的内容
+                            start = raw_text.find("<Document>:") + len("<Document>:")
+                            end = raw_text.find("<|im_end|>")
+                            if end > start:
+                                cleaned_text = raw_text[start:end].strip()
+                            else:
+                                cleaned_text = raw_text[start:].strip()
+                        else:
+                            cleaned_text = raw_text
+                    
+                        formatted_results.append({
+                            "text": cleaned_text,
+                            "score": float(item.get("relevance_score", 0.0)),
+                            "index": item.get("index", 0)
+                        })
+
+                    server_logger.info(f"本地Qwen3 API响应: {formatted_results[:top_k]}")
+                    return formatted_results[:top_k]
+                else:
+                    server_logger.warning(f"API响应格式异常: {result}")
+                    return []
+            else:
+                server_logger.error(f"API调用失败,状态码: {response.status_code}, 响应: {response.text}")
+                return []
+
+        except Exception as e:
+            server_logger.error(f"执行本地Qwen3重排序失败: {str(e)}")
+            return [{"text": doc, "score": 0.0} for doc in candidates[:top_k]]
+
     def qwen3_rerank(self, query: str, documents: List[str], top_k: int = None,
                     instruction: str = "请根据桥梁施工建设相关的查询内容,对文档进行重新排序,优先返回与桥梁施工、建设标准、技术规范、质量控制、安全管理等高度相关的文档。") -> List[Dict[str, Any]]:
         """
-        使用 Qwen3-Reranker-8B 进行重排序
+        使用硅基流动 Qwen3-Reranker-8B API 进行重排序
 
         Args:
             query: 查询文本
@@ -105,36 +210,37 @@ class LqReranker:
         """
         try:
             if not top_k:
-                top_k = self.top_k
+                top_k = 10  # 默认值
 
-            if not self.qwen_api_key:
-                server_logger.error("Qwen Reranker API Key 未配置")
+            if not self.silicoflow_rerank_api_key:
+                server_logger.error("硅基流动 Reranker API Key 未配置")
                 return []
 
-            server_logger.info(f"开始执行Qwen3重排序,查询: '{query}', 文档数量: {len(documents)}")
+            server_logger.info(f"开始执行硅基流动Qwen3重排序,查询: '{query}', 文档数量: {len(documents)}")
 
+            
             # 构建请求数据
             request_data = {
-                "model": self.qwen_model,
+                "model": self.silicoflow_rerank_model,
                 "query": query,
                 "documents": documents,
                 "instruction": instruction,
                 "top_n": top_k,
                 "return_documents": True,
-                "max_chunks_per_doc": 123,
-                "overlap_tokens": 79
+                # "max_chunks_per_doc": 123,
+                # "overlap_tokens": 79
             }
 
             headers = {
-                "Authorization": f"Bearer {self.qwen_api_key}",
+                "Authorization": f"Bearer {self.silicoflow_rerank_api_key}",
                 "Content-Type": "application/json"
             }
 
-            server_logger.debug(f"调用Qwen3 Reranker API: {self.qwen_api_url}")
+            server_logger.debug(f"调用硅基流动Qwen3 Reranker API: {self.silicoflow_rerank_api_url}")
             server_logger.debug(f"请求数据: {json.dumps(request_data, ensure_ascii=False)}")
 
             response = requests.post(
-                self.qwen_api_url,
+                self.silicoflow_rerank_api_url,
                 headers=headers,
                 json=request_data,
                 timeout=30
@@ -142,7 +248,7 @@ class LqReranker:
 
             if response.status_code == 200:
                 result = response.json()
-                server_logger.debug(f"Qwen3 API响应: {json.dumps(result, ensure_ascii=False)}")
+                server_logger.debug(f"硅基流动Qwen3 API响应: {json.dumps(result, ensure_ascii=False)}")
 
                 if "results" in result:
                     # 格式化结果为统一格式
@@ -156,15 +262,17 @@ class LqReranker:
 
                     return formatted_results[:top_k]
                 else:
-                    server_logger.warning(f"Qwen3 API响应格式异常: {result}")
+                    server_logger.warning(f"API响应格式异常: {result}")
                     return []
             else:
-                server_logger.error(f"Qwen3 API调用失败,状态码: {response.status_code}, 响应: {response.text}")
+                server_logger.error(f"API调用失败,状态码: {response.status_code}, 响应: {response.text}")
                 return []
 
         except Exception as e:
-            server_logger.error(f"执行Qwen3重排序失败: {str(e)}")
+            server_logger.error(f"执行硅基流动Qwen3重排序失败: {str(e)}")
             # 返回原始顺序作为fallback
-            return [{"text": doc, "score": 0.0, "index": i} for i, doc in enumerate(documents[:top_k])]
+            return [{"text": doc, "score": 0.0} for doc in documents[:top_k]]
+
+rerank_model = LqReranker()
+
 
-rerank_model = LqReranker()

+ 53 - 0
foundation/ai/rag/retrieval/entities_enhance.py

@@ -0,0 +1,53 @@
+import json
+import asyncio
+from foundation.observability.monitoring.time_statistics import track_execution_time
+from foundation.ai.rag.retrieval.retrieval import retrieval_manager
+from foundation.observability.logger.loggering import server_logger
+
+
+
+class EntitiesEnhance():
+
+    def __init__(self):
+        self.save_path = "temp\entity_bfp_recall\entity_bfp_recall.json"
+        self.bfp_result_lists = []
+    @track_execution_time
+    def entities_enhance_retrieval(self,query_pairs):
+        def run_async(coro):
+            """在合适的环境中运行异步函数"""
+            try:
+                loop = asyncio.get_running_loop()
+                import concurrent.futures
+                with concurrent.futures.ThreadPoolExecutor() as executor:
+                    future = executor.submit(asyncio.run, coro)
+                    return future.result()
+            except RuntimeError:
+                return asyncio.run(coro)
+
+        for query_pair in query_pairs:
+            entity = query_pair['entity']
+            search_keywords = query_pair['search_keywords']
+            background = query_pair['background']
+
+            entity_list = run_async(retrieval_manager.entity_recall(
+                entity,
+                search_keywords,
+                recall_top_k=5,      # 主实体返回数量
+                max_results=5       # 最终最多返回20个实体文本
+            ))
+
+            # top_k,二次重排最多返回数量
+            bfp_result = run_async(retrieval_manager.async_bfp_recall(entity_list,background,top_k=3))
+            server_logger.info(f"bfp_result:{bfp_result}")
+            self.bfp_result_lists.append(bfp_result)
+            server_logger.info("实体增强召回结束")
+        self.test_file(self.bfp_result_lists,seve=True)
+        return self.bfp_result_lists
+            
+
+    def test_file(self,bfp_result,seve = False):
+        if seve:
+            with open(self.save_path, "w", encoding="utf-8") as f:
+                json.dump(bfp_result, f, ensure_ascii=False, indent=4)
+
+entity_enhance = EntitiesEnhance()

+ 139 - 17
foundation/ai/rag/retrieval/query_rewrite.py

@@ -2,11 +2,8 @@
 
 import uuid
 import asyncio
-from foundation.infrastructure.config.config import config_handler
 from foundation.observability.logger.loggering import server_logger
 from foundation.ai.agent.generate.model_generate import generate_model_client
-from foundation.database.base.vector.milvus_vector import MilvusVectorManager
-from core.construction_review.component.reviewers.utils import prompt_loader
 
 class QueryRewriteManager():
     """
@@ -16,15 +13,31 @@ class QueryRewriteManager():
     def __init__(self):
         # 获取部署的模型列表
         self.generate_model_client = generate_model_client
-        self.prompt_loader = prompt_loader
+
+    @property
+    def prompt_loader(self):
+        """延迟加载 prompt_loader,避免循环导入"""
+        from core.construction_review.component.reviewers.utils import prompt_loader
+        return prompt_loader
 
     def query_extract(self, review_content):
         """
         从审查条文中提取query
-        return:
-            query: str
-            background: str
-            parameters: str
+
+        Args:
+            review_content: 审查内容文本
+
+        Returns:
+            list: 标准格式的查询列表
+            [
+                {
+                    "entity": str,           # 实体名称
+                    "search_keywords": list, # 搜索关键词列表
+                    "background": str,       # 背景信息
+                    "parameter": str         # 技术参数
+                }
+            ]
+            或 None(提取失败时)
         """
         try:
             # 获取提示词模板并组装
@@ -42,16 +55,125 @@ class QueryRewriteManager():
             # 生成唯一的trace_id用于追踪
             trace_id = str(uuid.uuid4())
 
-            # 调用模型生成接口(使用异步运行)
-            model_response = asyncio.run(self.generate_model_client.get_model_generate_invoke(
-                trace_id=trace_id,
-                task_prompt_info=task_prompt_info
-            ))
-
+            # 调用模型生成接口(处理异步调用)
+            try:
+                loop = asyncio.get_running_loop()
+                # 如果已有运行中的事件循环,使用create_task
+                import concurrent.futures
+                with concurrent.futures.ThreadPoolExecutor() as executor:
+                    future = executor.submit(
+                        asyncio.run,
+                        self.generate_model_client.get_model_generate_invoke(
+                            trace_id=trace_id,
+                            task_prompt_info=task_prompt_info
+                        )
+                    )
+                    model_response = future.result()
+            except RuntimeError:
+                # 没有运行中的事件循环,直接使用asyncio.run
+                model_response = asyncio.run(self.generate_model_client.get_model_generate_invoke(
+                    trace_id=trace_id,
+                    task_prompt_info=task_prompt_info
+                ))
+            
+            # 格式化模型响应
+            formatted_response = self.ai_respose_format(model_response)
+            server_logger.info(f"查询对构建完成,构建 {len(formatted_response)}条。")
             # 记录日志
-            server_logger.info(f"Query 提取完成长度: {len(review_content)}")
-            return model_response
+            if formatted_response:
+                server_logger.info(f"Query 提取成功, 提取到 {len(formatted_response)} 个实体")
+            else:
+                server_logger.warning(f"Query 提取失败, 格式化后为空")
+
+            return formatted_response
 
         except Exception as e:
             server_logger.error(f"Query 提取失败: {str(e)}")
-            return None
+            return None
+    
+    def ai_respose_format(self, model_response):
+        """
+        将模型返回的响应格式化为标准格式
+
+        Args:
+            model_response: AI模型返回的原始响应(可能是字符串或已解析的JSON)
+
+        Returns:
+            list: 标准格式的查询列表
+            [
+                {
+                    "entity": str,           # 实体名称
+                    "search_keywords": list, # 搜索关键词列表
+                    "background": str,       # 背景信息
+                    "parameter": str         # 技术参数
+                }
+            ]
+            或 None(解析失败时)
+        """
+        import re
+        import json
+
+        try:
+            # 1. 如果model_response已经是list,直接返回
+            if isinstance(model_response, list):
+                server_logger.info(f"模型响应已是list格式, 包含 {len(model_response)} 个实体")
+                return model_response
+
+            # 2. 如果是dict,包装成list返回
+            if isinstance(model_response, dict):
+                server_logger.info("模型响应是dict格式, 包装为list")
+                return [model_response]
+
+            # 3. 如果是字符串,需要解析
+            if isinstance(model_response, str):
+                response_text = model_response.strip()
+                server_logger.debug(f"原始响应字符串长度: {len(response_text)}")
+
+                # 3.1 尝试去除 ```json 和 ``` 标记
+                # 匹配 ```json ... ``` 或 ``` ... ```
+                json_pattern = r'```(?:json)?\s*\n?(.*?)\n?```'
+                json_match = re.search(json_pattern, response_text, re.DOTALL | re.IGNORECASE)
+
+                if json_match:
+                    # 提取代码块中的JSON内容
+                    json_str = json_match.group(1).strip()
+                    server_logger.debug("检测到markdown代码块, 已提取纯JSON内容")
+                else:
+                    # 如果没有代码块标记,尝试直接解析整个字符串
+                    json_str = response_text
+                    server_logger.debug("未检测到markdown代码块, 尝试直接解析")
+
+                # 3.2 去除可能的Markdown注释或多余空白
+                json_str = re.sub(r'\n+', '\n', json_str)  # 多个换行压缩为一个
+                json_str = json_str.strip()
+
+                server_logger.debug(f"待解析的JSON字符串: {json_str[:200]}...")
+
+                # 3.3 解析JSON
+                parsed_data = json.loads(json_str)
+
+                # 3.4 确保返回list格式
+                if isinstance(parsed_data, list):
+                    server_logger.info(f"JSON解析成功, 提取到 {len(parsed_data)} 个实体")
+                    return parsed_data
+                elif isinstance(parsed_data, dict):
+                    server_logger.info("JSON解析成功, 单个实体包装为list")
+                    return [parsed_data]
+
+                server_logger.warning(f"无法识别的JSON格式: {type(parsed_data)}")
+                return None
+
+            server_logger.warning(f"无法识别的响应类型: {type(model_response)}")
+            return None
+
+        except json.JSONDecodeError as e:
+            server_logger.error(f"JSON解析失败: {e}")
+            server_logger.error(f"原始响应: {str(model_response)[:500]}")
+            return None
+        except Exception as e:
+            server_logger.error(f"响应格式化异常: {e}")
+            server_logger.error(f"原始响应: {str(model_response)[:500]}")
+            return None
+
+
+query_rewrite_manager = QueryRewriteManager()

+ 82 - 47
foundation/ai/rag/retrieval/retrieval.py

@@ -24,8 +24,8 @@ class RetrievalManager:
         self.dense_weight = config_handler.get('hybrid_search', 'DENSE_WEIGHT', 0.7)
         self.sparse_weight = config_handler.get('hybrid_search', 'SPARSE_WEIGHT', 0.3)
 
-        # 重排序模型配置
-        self.rerank_model_type = config_handler.get('retrieval', 'RERANK_MODEL_TYPE', 'bge')  # 'bge' 或 'qwen3'
+        # 重排序模型配置(从 [model] 部分统一管理)
+        self.rerank_model_type = config_handler.get('model', 'RERANK_MODEL_TYPE', 'bge_rerank_model')
         self.logger.info(f"初始化重排序模型类型: {self.rerank_model_type}")
 
     def set_rerank_model(self, model_type: str):
@@ -33,10 +33,11 @@ class RetrievalManager:
         设置重排序模型类型
 
         Args:
-            model_type: 模型类型 ('bge' 或 'qwen3')
+            model_type: 配置section名称 ('bge_rerank_model', 'lq_rerank_model', 'silicoflow_rerank_model')
         """
-        if model_type not in ['bge', 'qwen3']:
-            raise ValueError("model_type 必须是 'bge' 或 'qwen3'")
+        valid_models = ['bge_rerank_model', 'lq_rerank_model', 'silicoflow_rerank_model']
+        if model_type not in valid_models:
+            raise ValueError(f"model_type 必须是 {valid_models}")
 
         self.rerank_model_type = model_type
         self.logger.info(f"重排序模型类型已设置为: {model_type}")
@@ -103,34 +104,33 @@ class RetrievalManager:
             if not cleaned_documents:
                 return []
 
-            if self.rerank_model_type == 'qwen3':
-                self.logger.info("使用 Qwen3-Reranker-8B 进行重排序")
-                rerank_results = rerank_model.qwen3_rerank(query_text, cleaned_documents, top_k)
+            # 根据配置section名称路由到对应的reranker方法
+            if self.rerank_model_type == 'lq_rerank_model':
+                self.logger.info("使用本地 Qwen3-Reranker-8B (lq_rerank_model) 进行重排序")
+                rerank_results = rerank_model.lq_rerank(query_text, cleaned_documents, top_k)
 
-                # 将清理后的文本映射回原始文本
-                for result in rerank_results:
-                    cleaned_text = result.get('text', '')
-                    # 查找原始文本
-                    for i, cleaned in enumerate(cleaned_documents):
-                        if cleaned == cleaned_text:
-                            result['text'] = valid_original_docs[i]
-                            break
+            elif self.rerank_model_type == 'silicoflow_rerank_model':
+                self.logger.info("使用硅基流动 Qwen3-Reranker-8B (silicoflow_rerank_model) 进行重排序")
+                rerank_results = rerank_model.qwen3_rerank(query_text, cleaned_documents, top_k)
 
-                return rerank_results
-            else:
-                self.logger.info("使用 BGE Reranker 进行重排序")
+            else:  # bge_rerank_model (默认)
+                self.logger.info("使用 BGE Reranker (bge_rerank_model) 进行重排序")
                 rerank_results = rerank_model.bge_rerank(query_text, cleaned_documents, top_k)
 
-                # 将清理后的文本映射回原始文本
-                for result in rerank_results:
-                    cleaned_text = result.get('text', '')
-                    # 查找原始文本
-                    for i, cleaned in enumerate(cleaned_documents):
-                        if cleaned == cleaned_text:
-                            result['text'] = valid_original_docs[i]
-                            break
+            # 将清理后的文本映射回原始文本(所有reranker都需要)
+            for result in rerank_results:
+                cleaned_text = result.get('text', '')
+                # 查找原始文本
+                for i, cleaned in enumerate(cleaned_documents):
+                    if cleaned == cleaned_text:
+                        result['text'] = valid_original_docs[i]
+                        break
+
+                # 统一字段名:将 relevance_score 转换为 score
+                if 'relevance_score' in result and 'score' not in result:
+                    result['score'] = float(result['relevance_score'])
 
-                return rerank_results
+            return rerank_results
 
         except Exception as e:
             self.logger.error(f"重排序失败,模型类型: {self.rerank_model_type}, 错误: {str(e)}")
@@ -138,29 +138,42 @@ class RetrievalManager:
             return [{"text": doc, "score": 0.0} for i, doc in enumerate(documents[:top_k])]
 
     @track_execution_time
-    async def entity_recall(self, main_entity: str,assisted_search_entity: list,
-                   top_k: int = 5) -> List[Dict[str, Any]]:
+    async def entity_recall(self, main_entity: str, assisted_search_entity: list,
+                   recall_top_k: int = 5, max_results: int = None) -> List[str]:
         """
         执行实体召回
-        :param main_entity: 查询实体
-        :param assisted_search_entity: 辅助搜索实体
-        :param top_k: 返回结果数量
+
+        Args:
+            main_entity: 主查询实体
+            assisted_search_entity: 辅助搜索实体列表
+            recall_top_k: 每次单实体召回返回的数量(默认5)
+            max_results: 最终返回的最大数量,如果为None则返回所有召回结果(默认None)
+
+        Returns:
+            List[str]: 实体文本内容列表
+
+        Note:
+            实际返回数量 = min(max_results, 主实体召回数 + 所有辅助实体召回数)
+            如果不设置max_results,可能返回较多结果(取决于辅助实体数量)
         """
+        self.logger.info(f"[entity_recall] 开始召回, recall_top_k={recall_top_k}, max_results={max_results}, 主实体='{main_entity}', 辅助实体数量={len(assisted_search_entity)}")
+
         collection_name = "first_bfp_collection_entity"
         # 主实体搜索 - 使用异步方法
         entity_result = await self.async_multi_stage_recall(
             collection_name=collection_name,
             query_text=main_entity,
-            hybrid_top_k=20,  # 从默认50降到20
-            top_k=top_k
+            hybrid_top_k=50,
+            top_k=recall_top_k
         )
+        self.logger.info(f"[entity_recall] 主实体召回完成, 返回 {len(entity_result)} 个结果")
 
         assist_tasks = [
             self.async_multi_stage_recall(
                 collection_name=collection_name,
                 query_text=assisted_search_entity,
-                hybrid_top_k=20,  # 从默认50降到20
-                top_k=top_k
+                hybrid_top_k=50,
+                top_k=recall_top_k
             ) for assisted_search_entity in assisted_search_entity
         ]
         # 辅助搜索,异步并发
@@ -174,9 +187,21 @@ class RetrievalManager:
 
         all_results = entity_result + assist_results
 
+        # if self.rerank_model_type == 'silicoflow_rerank_model':
+        #     with open("temp\entity_bfp_recall\silicoflow_rerank_model.json", "w", encoding="utf-8") as f:
+        #         json.dump(all_results, f, ensure_ascii=False, indent=4)
+        # elif self.rerank_model_type == 'lq_rerank_model':
+        #     with open("temp\entity_bfp_recall\lq_rerank_model.json", "w", encoding="utf-8") as f:
+        #         json.dump(all_results, f, ensure_ascii=False, indent=4)
+        # 去重并提取文本内容
         entity_list = list(set([item['text_content'] for item in all_results]))
-        self.logger.info(f"entity_list:{entity_list}")
 
+        # 如果设置了max_results,进行截断
+        if max_results is not None and len(entity_list) > max_results:
+            entity_list = entity_list[:max_results]
+            self.logger.info(f"[entity_recall] 结果截断到 max_results={max_results}")
+
+        self.logger.info(f"entity_list_len:{len(entity_list)}")
         return entity_list
     
     @track_execution_time
@@ -193,6 +218,8 @@ class RetrievalManager:
         import time
         start_time = time.time()
 
+        self.logger.info(f"[async_bfp_recall] 开始召回, top_k={top_k}, 实体数量={len(entity_list)}, 背景='{background[:50]}...'")
+
         # 异步并发召回编制依据
         collection_name = "first_bfp_collection_test"
 
@@ -202,14 +229,13 @@ class RetrievalManager:
             self.async_multi_stage_recall(
                 collection_name=collection_name,
                 query_text=entity,
-                hybrid_top_k=20,  # 从50降到20,减少60%的混合搜索时间
+                hybrid_top_k=10,  # 从50降到20,减少60%的混合搜索时间
                 top_k=top_k
             ) for entity in entity_list
         ]
 
         bfp_tasks_list = await asyncio.gather(*bfp_tasks,return_exceptions=True)
         gather_end = time.time()
-        gather_time = gather_end - gather_start
 
         bfp_results = []
         for res in bfp_tasks_list:
@@ -218,6 +244,8 @@ class RetrievalManager:
             else:
                 bfp_results.extend(res)
 
+        self.logger.info(f"[async_bfp_recall] 第一阶段召回完成, 共召回 {len(bfp_results)} 个文档")
+
         # BFP召回结果已经通过multi_stage_recall进行了重排序,保持原有顺序
         # 只对第一次重排序得分大于0.8的文档进行二次重排序
         high_score_results = [item for item in bfp_results if item.get('rerank_score', 0) > 0.8]
@@ -225,10 +253,17 @@ class RetrievalManager:
 
         self.logger.info(f"筛选结果:高分文档(>0.8) {len(high_score_results)} 个,低分文档(≤0.8) {len(low_score_results)} 个")
 
-        # 如果没有高分文档,直接返回原始结果
+        # 如果没有高分文档,直接返回top_k个结果(按hybrid_similarity排序)
         if not high_score_results:
-            self.logger.info("没有得分大于0.8的文档,跳过二次重排序,直接返回原始结果")
-            return bfp_results
+            self.logger.info(f"没有得分大于0.8的文档,跳过二次重排序,返回top_k={top_k}个结果(按hybrid_similarity排序)")
+            # 按 hybrid_similarity 降序排序,返回 top_k 个
+            sorted_results = sorted(bfp_results, key=lambda x: x.get('hybrid_similarity', 0), reverse=True)
+            return sorted_results[:top_k]
+
+        # 检查background是否为空,如果为空则跳过二次重排序
+        if not background or not background.strip():
+            self.logger.warning("background为空,跳过二次重排序,直接返回高分文档")
+            return high_score_results
 
         # 提取高分文档的文本内容用于二次重排
         high_score_text_content = list(set([item['text_content'] for item in high_score_results]))
@@ -236,9 +271,10 @@ class RetrievalManager:
 
         # 二次重排 - 使用配置的重排序模型
         rerank_start = time.time()
-        bfp_rerank_result = self._get_rerank_results(background, high_score_text_content, 5)
+        # 使用传入的 top_k 参数,而不是硬编码为5
+        bfp_rerank_result = self._get_rerank_results(background, high_score_text_content, top_k)
         rerank_end = time.time()
-        self.logger.info(f"二次重排序耗时: {rerank_end - rerank_start:.3f}秒")
+        self.logger.info(f"二次重排序耗时: {rerank_end - rerank_start:.3f}秒, top_k={top_k}")
 
         # 根据重排结果重新组织数据
         reorganize_start = time.time()
@@ -259,7 +295,7 @@ class RetrievalManager:
         total_time = reorganize_end - start_time
 
         self.logger.info(f"结果重组耗时: {reorganize_end - reorganize_start:.3f}秒")
-        self.logger.info(f"二次重排完成,返回 {len(final_results)} 个高分文档,丢弃 {len(low_score_results)} 个低分文档")
+        self.logger.info(f"二次重排完成,返回 {len(final_results)} 个高分文档(top_k={top_k}),丢弃 {len(low_score_results)} 个低分文档")
         self.logger.info(f"[async_bfp_recall] 总耗时: {total_time:.3f}秒 (召回: {gather_end-gather_start:.3f}s + 重排: {rerank_end-rerank_start:.3f}s + 其他: {total_time-(gather_end-gather_start)-(rerank_end-rerank_start):.3f}s)")
 
         return final_results
@@ -540,7 +576,6 @@ class RetrievalManager:
             start_time = time.time()
 
             # 第一阶段:混合搜索召回(向量+BM25)
-            hybrid_start = time.time()
             hybrid_results = await asyncio.to_thread(
                 self.hybrid_search_recall,
                 collection_name=collection_name,

+ 1 - 1
foundation/database/base/vector/milvus_vector.py

@@ -59,7 +59,7 @@ class MilvusVectorManager(BaseVectorDB):
         # 初始化文本向量化模型
         mh = _get_model_handler()
         if mh:
-            self.emdmodel = mh._get_lq_qwen3_8b_emd()
+            self.emdmodel = mh.get_embedding_model()
         else:
             raise ImportError("无法导入model_handler,无法初始化嵌入模型")
 

+ 0 - 98
test_prep_basis_review.py

@@ -1,98 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
-测试prep_basis_review方法的简单脚本
-"""
-
-import asyncio
-import sys
-import os
-
-# 添加项目根目录到sys.path
-current_file = os.path.abspath(__file__)
-root_dir = os.path.dirname(os.path.dirname(os.path.dirname(current_file)))
-sys.path.append(root_dir)
-
-from core.base.task_models import TaskFileInfo
-from core.construction_review.component.ai_review_engine import AIReviewEngine
-
-async def test_prep_basis_review():
-    """测试编制依据审查方法"""
-    print("🚀 开始测试 prep_basis_review 方法")
-
-    # 模拟编制依据文本内容
-    test_content = """
-    (1)《架桥机通用技术条件》(GB/T 26470-2011);
-    (2)《起重机设计规范》(GB/T 3811-2008);
-    (3)《起重机械安全规程 第 5 部分:桥式和门式起重机》(GB 6067.5-2014);
-    (4)《电气装置安装工程 起重机电气装置施工及验收规范》(GB 50256-2014);
-    (5)《起重设备安装工程施工及验收规范》(GB50278-2010);
-    (6)《施工现场机械设备检查技术规范》(JGJ 160-2016);
-    (7)《公路桥涵施工技术规范》(JTG/T 3650-2020);
-    (8)《建设工程安全生产管理条例》《四川省安全生产条例》(2023);
-    (9)《重要用途钢丝绳》(GB 8918-2006);
-    (10)《起重机用钢丝绳》(GB T 34198-2017);
-    """
-
-    # 创建模拟的TaskFileInfo
-    file_info_dict = {
-        'file_id': "test_file_001",
-        'callback_task_id': "test_callback_001",
-        'user_id': "test_user_001",
-        'review_config': [],
-        'project_plan_type': "construction_plan",
-        'tendency_review_role': "safety_reviewer",
-        'file_name': "test_document.doc",
-        'file_type': "doc",
-        'launched_at': 1234567890
-    }
-    task_info = TaskFileInfo(file_info_dict)
-
-    # 创建AIReviewEngine实例
-    ai_engine = AIReviewEngine(task_info, max_concurrent_reviews=4)
-
-    # 准备审查数据
-    review_data = {
-        'content': test_content.strip(),
-        'max_concurrent': 4
-    }
-
-    # 执行编制依据审查
-    try:
-        result = await ai_engine.prep_basis_review(
-            review_data=review_data,
-            trace_id="test_prep_basis_001",
-            state={"test": True},
-            stage_name="编制依据审查测试"
-        )
-
-        print("✅ 编制依据审查成功完成!")
-        print(f"📊 审查结果统计:")
-        print(f"   - 成功状态: {result.get('success', False)}")
-        print(f"   - 执行时间: {result.get('execution_time', 0):.2f}秒")
-        print(f"   - 总编制依据数: {result.get('total_basis_items', 0)}")
-        print(f"   - 有效项目数: {result.get('valid_items', 0)}")
-        print(f"   - 标准项目数: {result.get('standard_items', 0)}")
-
-        # 显示部分审查结果详情
-        review_results = result.get('review_results', [])
-        if review_results:
-            print(f"📋 审查结果详情 (前2个批次):")
-            for i, batch in enumerate(review_results[:2]):
-                print(f"   批次 {i+1}: {len(batch)} 条编制依据")
-                for j, item in enumerate(batch[:3]):  # 每批次显示前3条
-                    name = item.get('name', 'Unknown')[:30] + "..." if len(item.get('name', '')) > 30 else item.get('name', 'Unknown')
-                    status = item.get('status', 'Unknown')
-                    is_standard = item.get('is_standard', False)
-                    print(f"     {j+1}. {name}: {status} ({'标准' if is_standard else '非标准'})")
-        else:
-            print("⚠️ 没有返回审查结果")
-
-    except Exception as e:
-        print(f"❌ 测试失败: {str(e)}")
-        import traceback
-        traceback.print_exc()
-
-if __name__ == "__main__":
-    asyncio.run(test_prep_basis_review())

+ 0 - 141
test_prep_basis_workflow.py

@@ -1,141 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
-测试完整的编制依据审查工作流
-"""
-
-import asyncio
-import sys
-import os
-
-# 添加项目根目录到sys.path
-current_file = os.path.abspath(__file__)
-root_dir = os.path.dirname(os.path.dirname(os.path.dirname(current_file)))
-sys.path.append(root_dir)
-
-from core.base.task_models import TaskFileInfo
-from core.construction_review.workflows.ai_review_workflow import AIReviewWorkflow
-
-async def test_prep_basis_workflow():
-    """测试完整的编制依据审查工作流"""
-    print("开始测试编制依据审查工作流")
-
-    # 模拟结构化内容,包含编制依据
-    structured_content = {
-        'file_id': 'test_file_002',
-        'file_name': 'construction_plan.doc',
-        'prep_basis': '''
-        (1)《架桥机通用技术条件》(GB/T 26470-2011);
-        (2)《起重机设计规范》(GB/T 3811-2008);
-        (3)《起重机械安全规程 第 5 部分:桥式和门式起重机》(GB 6067.5-2014);
-        (4)《电气装置安装工程 起重机电气装置施工及验收规范》(GB 50256-2014);
-        (5)《起重设备安装工程施工及验收规范》(GB50278-2010);
-        (6)《施工现场机械设备检查技术规范》(JGJ 160-2016);
-        (7)《公路桥涵施工技术规范》(JTG/T 3650-2020);
-        (8)《建设工程安全生产管理条例》《四川省安全生产条例》(2023);
-        (9)《重要用途钢丝绳》(GB 8918-2006);
-        (10)《起重机用钢丝绳》(GB T 34198-2017);
-        (11)《起重机钢丝绳保养、维护、检验和报废》(GBT5972-2023);
-        (12)《建筑施工起重吊装工程安全技术规范》(JGJ 276-2012);
-        (13)《建筑施工高处作业安全技术规范》(JGJ 80-2016);
-        ''',
-        'chunks': [
-            {
-                'section_label': '第一章 编制依据',
-                'content': '''第一章 编制依据
-
-本专项施工方案编制依据如下:
-(1)《架桥机通用技术条件》(GB/T 26470-2011);
-(2)《起重机设计规范》(GB/T 3811-2008);
-(3)《起重机械安全规程 第 5 部分:桥式和门式起重机》(GB 6067.5-2014);
-                ''',
-                'page': '1'
-            },
-            {
-                'section_label': '第二章 工程概况',
-                'content': '''第二章 工程概况
-
-本工程为某某大桥上部结构施工项目...
-                ''',
-                'page': '2'
-            }
-        ],
-        'outline': {
-            'chapters': [
-                {'title': '第一章 编制依据', 'page': '1'},
-                {'title': '第二章 工程概况', 'page': '2'}
-            ]
-        }
-    }
-
-    # 创建模拟的TaskFileInfo,包含prep_basis_check配置
-    file_info_dict = {
-        'file_id': "test_file_002",
-        'callback_task_id': "test_callback_prep_basis_002",
-        'user_id': "test_user_002",
-        'review_config': ['completeness_check', 'prep_basis_check'],  # 包含编制依据审查
-        'project_plan_type': "construction_plan",
-        'tendency_review_role': "safety_reviewer",
-        'file_name': "test_prep_basis_document.doc",
-        'file_type': "doc",
-        'launched_at': 1234567890
-    }
-    task_info = TaskFileInfo(file_info_dict)
-
-    try:
-        # 创建AI审查工作流实例
-        workflow = AIReviewWorkflow(
-            task_file_info=task_info,
-            structured_content=structured_content,
-            progress_manager=None,  # 简化测试,不使用进度管理器
-            max_review_units=1,     # 只测试一个单元,减少执行时间
-            review_mode="first"
-        )
-
-        print("工作流创建成功,开始执行...")
-
-        # 执行工作流
-        result = await workflow.execute()
-
-        print("工作流执行完成!")
-        print(f"执行结果:")
-        print(f"  - 文件ID: {result.get('file_id')}")
-        print(f"  - 总单元数: {result.get('total_units')}")
-        print(f"  - 成功单元数: {result.get('successful_units')}")
-        print(f"  - 失败单元数: {result.get('failed_units')}")
-        print(f"  - 状态: {result.get('status')}")
-
-        # 检查审查结果中是否包含编制依据审查
-        review_results = result.get('review_results', [])
-        if review_results:
-            print(f"  - 审查结果数量: {len(review_results)}")
-
-            # 查找编制依据审查结果
-            prep_basis_result = None
-            for item in review_results:
-                if isinstance(item, dict) and 'review_results' in item:
-                    # 这可能是编制依据审查结果
-                    if 'total_basis_items' in item:
-                        prep_basis_result = item
-                        break
-
-            if prep_basis_result:
-                print(f"编制依据审查结果:")
-                print(f"  - 成功状态: {prep_basis_result.get('success', False)}")
-                print(f"  - 总编制依据数: {prep_basis_result.get('total_basis_items', 0)}")
-                print(f"  - 有效项目数: {prep_basis_result.get('valid_items', 0)}")
-                print(f"  - 标准项目数: {prep_basis_result.get('standard_items', 0)}")
-                print(f"  - 执行时间: {prep_basis_result.get('execution_time', 0):.2f}秒")
-            else:
-                print("未找到编制依据审查结果")
-        else:
-            print("没有审查结果返回")
-
-    except Exception as e:
-        print(f"测试失败: {str(e)}")
-        import traceback
-        traceback.print_exc()
-
-if __name__ == "__main__":
-    asyncio.run(test_prep_basis_workflow())

+ 0 - 138
test_sensitive_word_checker.py

@@ -1,138 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
-敏感词检测器测试脚本
-"""
-
-import asyncio
-from core.construction_review.component.reviewers.utils import (
-    SensitiveWordChecker,
-    check_sensitive_words,
-    check_sensitive_words_async,
-    format_check_results
-)
-
-
-def test_sync():
-    """测试同步检测"""
-    print("=" * 60)
-    print("测试同步敏感词检测")
-    print("=" * 60)
-    
-    # 初始化
-    print("\n1. 初始化敏感词检测器...")
-    stats = SensitiveWordChecker.initialize()
-    print(f"   加载统计: {stats}")
-    
-    # 测试文本
-    test_texts = [
-        "这是一段正常的文本内容",
-        "施工方案中使用了最好的材料",
-        "本项目采用国内最先进的技术",
-    ]
-    
-    print("\n2. 开始检测...")
-    for i, text in enumerate(test_texts, 1):
-        print(f"\n   测试 {i}: {text}")
-        results = check_sensitive_words(text)
-        
-        if results:
-            print(f"   ⚠️  发现 {len(results)} 个敏感词:")
-            for item in results:
-                print(f"      - 敏感词: '{item['word']}' | 位置: {item['position']}-{item['end_position']} | 来源: {item['source']}")
-        else:
-            print("   ✓ 未发现敏感词")
-    
-    # 测试格式化结果
-    print("\n3. 测试格式化结果...")
-    text = "本项目采用最好的材料和最先进的技术"
-    results = check_sensitive_words(text)
-    formatted = format_check_results(results, text)
-    print(f"   格式化结果: {formatted}")
-
-
-async def test_async():
-    """测试异步检测"""
-    print("\n" + "=" * 60)
-    print("测试异步敏感词检测(并发)")
-    print("=" * 60)
-    
-    test_texts = [
-        "这是第一段测试文本",
-        "这是第二段包含最好的文本",
-        "这是第三段包含最先进的文本",
-        "这是第四段正常文本",
-        "这是第五段包含绝对化用语的文本",
-    ]
-    
-    print(f"\n并发检测 {len(test_texts)} 段文本...")
-    
-    # 并发执行
-    tasks = [check_sensitive_words_async(text) for text in test_texts]
-    results_list = await asyncio.gather(*tasks)
-    
-    # 输出结果
-    for i, (text, results) in enumerate(zip(test_texts, results_list), 1):
-        print(f"\n文本 {i}: {text}")
-        if results:
-            print(f"⚠️  发现 {len(results)} 个敏感词:")
-            for item in results:
-                print(f"   - {item['word']} (位置: {item['position']}, 来源: {item['source']})")
-        else:
-            print("✓ 未发现敏感词")
-
-
-def test_performance():
-    """测试性能"""
-    print("\n" + "=" * 60)
-    print("性能测试")
-    print("=" * 60)
-    
-    import time
-    
-    # 生成大量文本
-    test_text = "这是一段包含最好、最先进、绝对等敏感词的长文本。" * 100
-    
-    print(f"\n测试文本长度: {len(test_text)} 字符")
-    
-    # 测试检测速度
-    iterations = 100
-    start_time = time.time()
-    
-    for _ in range(iterations):
-        results = check_sensitive_words(test_text)
-    
-    end_time = time.time()
-    elapsed = end_time - start_time
-    avg_time = elapsed / iterations * 1000
-    
-    print(f"执行 {iterations} 次检测")
-    print(f"总耗时: {elapsed:.3f} 秒")
-    print(f"平均耗时: {avg_time:.3f} 毫秒/次")
-    print(f"检测速度: {iterations/elapsed:.2f} 次/秒")
-
-
-def main():
-    """主函数"""
-    print("\n" + "=" * 60)
-    print("敏感词检测系统测试")
-    print("=" * 60)
-    
-    # 同步测试
-    test_sync()
-    
-    # 异步测试
-    asyncio.run(test_async())
-    
-    # 性能测试
-    test_performance()
-    
-    print("\n" + "=" * 60)
-    print("测试完成!")
-    print("=" * 60)
-
-
-if __name__ == "__main__":
-    main()
-

+ 123 - 0
utils_test/AI_Review_Test/test_rag_enhanced_check.py

@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+测试AI审查引擎的rag_enhanced_check方法
+用于快速验证模块功能是否正常实现
+"""
+
+import sys
+import os
+import time
+
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+from core.base.task_models import TaskFileInfo
+from core.construction_review.component.ai_review_engine import AIReviewEngine
+from foundation.observability.logger.loggering import server_logger as logger
+
+
+
+def test_rag_enhanced_check():
+    """
+    测试rag_enhanced_check方法的基本功能
+    """
+    print("=" * 60)
+    print("测试rag_enhanced_check方法")
+    print("=" * 60)
+
+    # 准备测试数据
+    query_content = """相关准备工作
+1、技术准备
+四川公路桥梁建设集团有限公司 镇广C4项目经理部JQ220t-40m架桥机安装及拆除专项施工方案
+(1)应将架桥机随机文件和有关技术资料准备齐全,认真组织安装人员学习阅
+读本方案,并以此为依据结合实际工况及地貌拟定有关安装施工方案。
+(2)安装前,应对设备散件进行全面检查、清理,如发现有损伤、腐蚀或其它
+缺陷,应在安装前予以处理,合格后方可安装。
+(3)应对架桥机运梁轨道进行如下检查:
+1)架桥机运梁轨道基础应有足够的承压能力,应能满足架桥机运梁平车载重安
+全运行。轨道下部如有支垫是否具有强大的刚性和一定的密度,特殊端部的支垫必
+须采用刚性支垫,严禁使用腐蚀的枕木。
+2)轨道钢轨正面、侧面的不平度不应大于1/1500,全长范围内不应大于10毫
+米。
+3)轨道安装的允许偏差:
+①轨道实际中心线对轨道设计中心线的位置偏移允许偏差为3毫米。
+②轨距允许偏差为±5毫米。
+③轨道纵向不平度应小于 1/1500,且全行程不超过10毫米。
+④同一断面上两轨道的标高相对偏差不超过5毫米。
+4)轨道接头应符合下列要求:
+①接头左、右、上三面错位不应大于2毫米。
+②两平行轨道接口的位置应错开,其错开距离不应等于架桥机运梁平车前后车
+轮的轮距。
+第19页
+四川公路桥梁建设集团有限公司 镇广C4项目经理部JQ220t-40m架桥机安装及拆除专项施工方案
+③接头间隙应为1~2毫米,伸缩缝接头间隙应符合设计要求,其偏差不应大于
+±1毫米。
+④轨道的悬空部位是否得到了加强。
+5)施工期间的基本要求及安全规定
+①架桥机安装属于高空作业,施工前技术员及现场指挥员向参加施工的所有人
+员详细介绍安装工序、技术要求和指挥信号。
+②严禁在施工的架桥机下面逗留通过,与安装施工无关人员不准擅自进入施工
+现场。
+③现场施工人员必须按有关安全规定佩戴好安全用品(安全带、安全帽、绝缘
+鞋等)。
+④施工现场使用的氧气瓶、乙炔瓶必须保证立放并固定好,两瓶之间距离不得
+小于5m。
+6)架桥机组装
+①安装原则:按从下至上的顺序进行安装,且必须在下部结构安装固定牢固以
+后,才能进行上部结构的安装。同时,在安装过程中,在运输及吊装能力允许的情
+况下,尽量保持设备各部件的整体性,一方面能有效减少危险作业点,另一方面有
+利于提高设备现场安装工效。
+②安装场地:架桥拼装在桥台台背后路基上,拼装场地选址位于路基上,拼装
+场地面积宽15m,长 100m,架桥机主体最大宽度为9m(移动轨道 18m),路基宽度
+满足安装场地所需,路基及便道顶面承载力满足接卸拼装所需的承载要求。
+③辅助机械设备:根据施工现场实际情况,选择1台25T汽车吊作为主要安装
+设备,架桥机主梁拼装采用分节进行吊装,整机重量最大的组件为三节主梁同时起
+吊,总重18t,采用2 台50t汽车吊满足吊装要求。"""
+    unit_content = {
+        "content": query_content
+    }
+
+    # 创建TaskFileInfo实例
+    file_info = {
+        'file_id': 'test_file_001',
+        'user_id': 'test_user',
+        'callback_task_id': 'test_task_001',
+        'file_name': 'test.pdf',
+        'file_type': 'pdf'
+    }
+    task_file_info = TaskFileInfo(file_info)
+
+    # 创建AIReviewEngine实例
+    review_engine = AIReviewEngine(task_file_info)
+
+    # 执行测试
+    print("\n[输入参数]")
+    print(f"  query_content: {query_content}")
+
+    start_time = time.time()
+    result = review_engine.rag_enhanced_check(unit_content)
+    logger.info(f"rag_enhanced_check_result {result}")
+    end_time = time.time()
+
+    # 输出结果
+    print("\n[输出结果]")
+    print(f"  file_name: {result.get('file_name')}")
+    print(f"  text_content: {result.get('text_content')}")
+    print(f"  metadata: {result.get('metadata')}")
+    print(f"  retrieval_status: {result.get('retrieval_status', 'N/A')}")
+
+    print(f"\n[执行时间] {end_time - start_time:.4f}秒")
+
+    # 验证断言
+    if result.get('retrieval_status') == 'no_results':
+        print("\n[测试结果] 通过 - RAG检索无结果")
+    else:
+        assert 'file_name' in result, "应包含file_name字段"
+        assert 'metadata' in result, "应包含metadata字段"
+        print("\n[测试结果] 通过 - rag_enhanced_check方法功能正常")
+
+    return result
+
+
+if __name__ == "__main__":
+    test_rag_enhanced_check()

+ 56 - 75
utils_test/RAG_Test/debug_query_extract.py

@@ -18,85 +18,66 @@ def debug_query_extract():
     """
     调试query_extract方法
     """
-    print("="*60)
-    print("调试QueryRewriteManager.query_extract方法")
-    print("="*60)
-
     # 测试数据
-    review_content = "深度大于3m的基坑开挖、有地下水侵扰的基坑清底封底,每个工作班至少巡查两遍。"
-    print(f"原始输入内容: {review_content}")
-    print(f"内容长度: {len(review_content)}")
-
-    try:
-        # 手动构建提示词模板进行调试
-        from foundation.ai.rag.retrieval.query_rewrite import prompt_loader, generate_model_client
-        import uuid
-        import asyncio
-
-        # 获取提示词模板
-        task_prompt = prompt_loader.get_prompt_template(
-            reviewer_type="query_extract",
-            review_content=review_content
-        )
-
-        print(f"\n[DEBUG] 提示词模板类型: {type(task_prompt)}")
-
-        # 尝试格式化消息
-        try:
-            messages = task_prompt.format_messages()
-            print(f"[DEBUG] 消息数量: {len(messages)}")
-            print(f"[DEBUG] 系统消息: {messages[0].content[:200]}...")
-            print(f"[DEBUG] 用户消息: {messages[1].content[:300]}...")
-
-            # 检查用户消息是否包含正确的review_content
-            if review_content in messages[1].content:
-                print("[OK] review_content 正确传递到提示词")
-            else:
-                print("[ERROR] review_content 未正确传递到提示词")
-                print(f"[DEBUG] 期望内容: {review_content}")
-                print(f"[DEBUG] 用户消息实际内容: {messages[1].content}")
-
-        except Exception as e:
-            print(f"[ERROR] 格式化消息失败: {e}")
-            return
+    review_content = """
+相关准备工作
+1、技术准备
+四川公路桥梁建设集团有限公司 镇广C4项目经理部JQ220t-40m架桥机安装及拆除专项施工方案
+(1)应将架桥机随机文件和有关技术资料准备齐全,认真组织安装人员学习阅
+读本方案,并以此为依据结合实际工况及地貌拟定有关安装施工方案。
+(2)安装前,应对设备散件进行全面检查、清理,如发现有损伤、腐蚀或其它
+缺陷,应在安装前予以处理,合格后方可安装。
+(3)应对架桥机运梁轨道进行如下检查:
+1)架桥机运梁轨道基础应有足够的承压能力,应能满足架桥机运梁平车载重安
+全运行。轨道下部如有支垫是否具有强大的刚性和一定的密度,特殊端部的支垫必
+须采用刚性支垫,严禁使用腐蚀的枕木。
+2)轨道钢轨正面、侧面的不平度不应大于1/1500,全长范围内不应大于10毫
+米。
+3)轨道安装的允许偏差:
+①轨道实际中心线对轨道设计中心线的位置偏移允许偏差为3毫米。
+②轨距允许偏差为±5毫米。
+③轨道纵向不平度应小于 1/1500,且全行程不超过10毫米。
+④同一断面上两轨道的标高相对偏差不超过5毫米。
+4)轨道接头应符合下列要求:
+①接头左、右、上三面错位不应大于2毫米。
+②两平行轨道接口的位置应错开,其错开距离不应等于架桥机运梁平车前后车
+轮的轮距。
+第19页
+四川公路桥梁建设集团有限公司 镇广C4项目经理部JQ220t-40m架桥机安装及拆除专项施工方案
+③接头间隙应为1~2毫米,伸缩缝接头间隙应符合设计要求,其偏差不应大于
+±1毫米。
+④轨道的悬空部位是否得到了加强。
+5)施工期间的基本要求及安全规定
+①架桥机安装属于高空作业,施工前技术员及现场指挥员向参加施工的所有人
+员详细介绍安装工序、技术要求和指挥信号。
+②严禁在施工的架桥机下面逗留通过,与安装施工无关人员不准擅自进入施工
+现场。
+③现场施工人员必须按有关安全规定佩戴好安全用品(安全带、安全帽、绝缘
+鞋等)。
+④施工现场使用的氧气瓶、乙炔瓶必须保证立放并固定好,两瓶之间距离不得
+小于5m。
+6)架桥机组装
+①安装原则:按从下至上的顺序进行安装,且必须在下部结构安装固定牢固以
+后,才能进行上部结构的安装。同时,在安装过程中,在运输及吊装能力允许的情
+况下,尽量保持设备各部件的整体性,一方面能有效减少危险作业点,另一方面有
+利于提高设备现场安装工效。
+②安装场地:架桥拼装在桥台台背后路基上,拼装场地选址位于路基上,拼装
+场地面积宽15m,长 100m,架桥机主体最大宽度为9m(移动轨道 18m),路基宽度
+满足安装场地所需,路基及便道顶面承载力满足接卸拼装所需的承载要求。
+③辅助机械设备:根据施工现场实际情况,选择1台25T汽车吊作为主要安装
+设备,架桥机主梁拼装采用分节进行吊装,整机重量最大的组件为三节主梁同时起
+吊,总重18t,采用2 台50t汽车吊满足吊装要求。
 
-        # 构建任务提示信息
-        task_prompt_info = {
-            "task_prompt": task_prompt,
-            "task_name": "query_extract"
-        }
-
-        # 生成trace_id
-        trace_id = str(uuid.uuid4())
-        print(f"[DEBUG] Trace ID: {trace_id}")
-
-        # 调用模型生成接口
-        print("[DEBUG] 开始调用模型...")
-        model_response = asyncio.run(generate_model_client.get_model_generate_invoke(
-            trace_id=trace_id,
-            task_prompt_info=task_prompt_info
-        ))
-
-        print(f"[DEBUG] 模型响应: {model_response}")
-
-        # 使用原始方法进行对比测试
-        print("\n" + "="*40)
-        print("使用原始QueryRewriteManager方法测试")
-        print("="*40)
-
-        query_rewrite_manager = QueryRewriteManager()
-        start_time = time.time()
-        result = query_rewrite_manager.query_extract(review_content)
-        end_time = time.time()
-        elapsed_time = end_time - start_time
+"""
+    query_rewrite_manager = QueryRewriteManager()
+    start_time = time.time()
+    result = query_rewrite_manager.query_extract(review_content)
+    end_time = time.time()
+    elapsed_time = end_time - start_time
 
-        print(f"[OK] 原始方法提取完成,耗时: {elapsed_time:.2f}秒")
-        print(f"[OK] 原始方法返回结果: {result}")
+    print(f"[OK] 原始方法提取完成,耗时: {elapsed_time:.2f}秒")
+    print(f"[OK] 原始方法返回结果: {result}")
 
-    except Exception as e:
-        print(f"[ERROR] 调试失败: {str(e)}")
-        import traceback
-        traceback.print_exc()
 
 
 def main():

+ 37 - 4
utils_test/RAG_Test/test_entity_bfp_recall.py

@@ -10,10 +10,43 @@ background = "JQ220t-40m架桥机安装及拆除"
 
 @track_execution_time
 def main():
+    print("="*60)
+    print("实体增强召回测试")
+    print("="*60)
+    print(f"主实体: {entity}")
+    print(f"辅助实体: {search_keywords}")
+    print(f"背景信息: {background}")
+    print("-"*60)
 
-    entity_list = asyncio.run(retrieval_manager.entity_recall(entity,search_keywords,top_k=5))
-    bfp_result = asyncio.run(retrieval_manager.async_bfp_recall(entity_list,background,top_k=10))
-    with open("temp\entity_bfp_recall\entity_bfp_recall.json", "w", encoding="utf-8") as f:
+    # 使用新参数调用 entity_recall
+    # recall_top_k=5: 每个实体召回5个结果
+    # max_results=20: 最终返回最多20个实体文本
+    entity_list = asyncio.run(retrieval_manager.entity_recall(
+        entity,
+        search_keywords,
+        recall_top_k=5,      # 每次单实体召回返回5个
+        max_results=20       # 最终最多返回20个
+    ))
+
+    print(f"\n✅ 实体召回完成, 共召回 {len(entity_list)} 个实体")
+    print(f"实体列表前5个: {entity_list[:5]}")
+
+    # 使用 top_k 参数调用 async_bfp_recall
+    # top_k=3: 二次重排后最多返回3个BFP文档
+    bfp_result = asyncio.run(retrieval_manager.async_bfp_recall(
+        entity_list,
+        background,
+        top_k=3
+    ))
+
+    print(f"\n✅ BFP召回完成, 共召回 {len(bfp_result)} 个文档")
+
+    # 保存结果
+    with open("temp/entity_bfp_recall/entity_bfp_recall.json", "w", encoding="utf-8") as f:
         json.dump(bfp_result, f, ensure_ascii=False, indent=4)
 
-main()
+    print(f"\n✅ 结果已保存到: temp/entity_bfp_recall/entity_bfp_recall.json")
+    print("="*60)
+
+if __name__ == "__main__":
+    main()

+ 47 - 1
utils_test/RAG_Test/test_rag_enhanced_check.py

@@ -19,7 +19,53 @@ review_engine = AIReviewEngine()
 # 记录开始时间
 import time
 start_time = time.time()
-query = """ # 5. 1 防护栏杆5.1 防护栏杆\n\n## 部分 5\n\n5.1.7 防护栏杆使用过程中应避免在构件上额外施加长期的外力作用或施加振动荷载 不应悬挂重5.1.7 防护栏杆使用过程中应避免在构件上额外施加长期的外力作用或施加振动荷载,不应悬挂重物。 发现锈蚀、腐蚀、松动或损坏的,应及时进行维修、更换。物。发现锈蚀、腐蚀、松动或损坏的,应及时进行维修、更换。"""
+query = """ 相关准备工作
+1、技术准备
+四川公路桥梁建设集团有限公司 镇广C4项目经理部JQ220t-40m架桥机安装及拆除专项施工方案
+(1)应将架桥机随机文件和有关技术资料准备齐全,认真组织安装人员学习阅
+读本方案,并以此为依据结合实际工况及地貌拟定有关安装施工方案。
+(2)安装前,应对设备散件进行全面检查、清理,如发现有损伤、腐蚀或其它
+缺陷,应在安装前予以处理,合格后方可安装。
+(3)应对架桥机运梁轨道进行如下检查:
+1)架桥机运梁轨道基础应有足够的承压能力,应能满足架桥机运梁平车载重安
+全运行。轨道下部如有支垫是否具有强大的刚性和一定的密度,特殊端部的支垫必
+须采用刚性支垫,严禁使用腐蚀的枕木。
+2)轨道钢轨正面、侧面的不平度不应大于1/1500,全长范围内不应大于10毫
+米。
+3)轨道安装的允许偏差:
+①轨道实际中心线对轨道设计中心线的位置偏移允许偏差为3毫米。
+②轨距允许偏差为±5毫米。
+③轨道纵向不平度应小于 1/1500,且全行程不超过10毫米。
+④同一断面上两轨道的标高相对偏差不超过5毫米。
+4)轨道接头应符合下列要求:
+①接头左、右、上三面错位不应大于2毫米。
+②两平行轨道接口的位置应错开,其错开距离不应等于架桥机运梁平车前后车
+轮的轮距。
+第19页
+四川公路桥梁建设集团有限公司 镇广C4项目经理部JQ220t-40m架桥机安装及拆除专项施工方案
+③接头间隙应为1~2毫米,伸缩缝接头间隙应符合设计要求,其偏差不应大于
+±1毫米。
+④轨道的悬空部位是否得到了加强。
+5)施工期间的基本要求及安全规定
+①架桥机安装属于高空作业,施工前技术员及现场指挥员向参加施工的所有人
+员详细介绍安装工序、技术要求和指挥信号。
+②严禁在施工的架桥机下面逗留通过,与安装施工无关人员不准擅自进入施工
+现场。
+③现场施工人员必须按有关安全规定佩戴好安全用品(安全带、安全帽、绝缘
+鞋等)。
+④施工现场使用的氧气瓶、乙炔瓶必须保证立放并固定好,两瓶之间距离不得
+小于5m。
+6)架桥机组装
+①安装原则:按从下至上的顺序进行安装,且必须在下部结构安装固定牢固以
+后,才能进行上部结构的安装。同时,在安装过程中,在运输及吊装能力允许的情
+况下,尽量保持设备各部件的整体性,一方面能有效减少危险作业点,另一方面有
+利于提高设备现场安装工效。
+②安装场地:架桥拼装在桥台台背后路基上,拼装场地选址位于路基上,拼装
+场地面积宽15m,长 100m,架桥机主体最大宽度为9m(移动轨道 18m),路基宽度
+满足安装场地所需,路基及便道顶面承载力满足接卸拼装所需的承载要求。
+③辅助机械设备:根据施工现场实际情况,选择1台25T汽车吊作为主要安装
+设备,架桥机主梁拼装采用分节进行吊装,整机重量最大的组件为三节主梁同时起
+吊,总重18t,采用2 台50t汽车吊满足吊装要求。"""
 unit_content= {
     "content" : query,
 }

+ 57 - 0
utils_test/RE_Rrank_Test/debug_rerank.py

@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+调试rerank请求差异
+"""
+import requests
+import json
+
+url = "http://192.168.91.253:9004/v1/rerank"
+
+prefix = '<|im_start|>system\nJudge whether the Document meets the requirements based on the Query and the Instruct provided. Note that the answer can only be "yes" or "no".<|im_end|>\n<|im_start|>user\n'
+suffix = "<|im_end|>\n<|im_start|>assistant\n"
+
+query_template = "{prefix}<Instruct>: {instruction}\n<Query>: {query}\n"
+document_template = "<Document>: {doc}{suffix}"
+
+instruction = "Given a web search query, retrieve relevant passages that answer the query"
+
+query = "什么是大语言模型?"
+candidates = [
+    "Qwen 是阿里巴巴推出的大模型系列。",
+    "今天天气很好,适合出去玩。",
+    "大语言模型是一种基于Transformer的模型",
+    "vegetable"
+]
+
+# 格式化
+formatted_query = query_template.format(prefix=prefix, instruction=instruction, query=query)
+formatted_documents = [document_template.format(doc=doc, suffix=suffix) for doc in candidates]
+
+print("=" * 80)
+print("请求数据:")
+print("=" * 80)
+request_data = {
+    "query": formatted_query,
+    "documents": formatted_documents
+}
+print(json.dumps(request_data, ensure_ascii=False, indent=2))
+
+print("\n" + "=" * 80)
+print("发送请求...")
+print("=" * 80)
+response = requests.post(url, json=request_data).json()
+
+print("\n" + "=" * 80)
+print("响应结果:")
+print("=" * 80)
+print(json.dumps(response, ensure_ascii=False, indent=2))
+
+print("\n" + "=" * 80)
+print("关键信息:")
+print("=" * 80)
+if "results" in response:
+    for idx, result in enumerate(response["results"]):
+        print(f"排名{idx+1}: index={result['index']}, score={result['relevance_score']:.6f}")
+        print(f"  文档预览: {result['document']['text'][:50]}...")

+ 36 - 0
utils_test/RE_Rrank_Test/qwen3_rerenker_test.py

@@ -0,0 +1,36 @@
+import requests
+
+url = "http://192.168.91.253:9004/v1/rerank"
+
+# Please use the query_template and document_template to format the query and
+# document for better reranker results.
+
+prefix = '<|im_start|>system\nJudge whether the Document meets the requirements based on the Query and the Instruct provided. Note that the answer can only be "yes" or "no".<|im_end|>\n<|im_start|>user\n'
+suffix = "<|im_end|>\n<|im_start|>assistant\n<think>\n\n</think>\n\n"
+
+query_template = "{prefix}<Instruct>: {instruction}\n<Query>: {query}\n"
+document_template = "<Document>: {doc}{suffix}"
+
+instruction = (
+    "Given a web search query, retrieve relevant passages that answer the query"
+)
+
+query = "什么是大语言模型?"
+documents = [
+    "Qwen 是阿里巴巴推出的大模型系列。",
+    "今天天气很好,适合出去玩。",
+    "大语言模型是一种基于Transformer的模型",
+    "vegetable"
+]
+
+documents = [
+    document_template.format(doc=doc, suffix=suffix) for doc in documents
+]
+
+response = requests.post(url,
+                         json={
+                             "query": query_template.format(prefix=prefix, instruction=instruction, query=query),
+                             "documents": documents,
+                         }).json()
+
+print(response)