Преглед изворни кода

v0.0.3-审查条文多问题响应

WangXuMing пре 2 месеци
родитељ
комит
d04b9ff3b9

+ 2 - 1
.gitignore

@@ -67,4 +67,5 @@ todo.md
 .R&D
 temp/
 *.json
-test_rawdata/
+test_rawdata/
+temp\AI审查结果.json

BIN
build_graph_app.png


+ 15 - 0
core/base/words_detect/__init__.py

@@ -0,0 +1,15 @@
+"""
+文本块错别字/重复字审查模块
+
+提供基于提示词的一次性 LLM 调用能力。
+"""
+
+from .core.reviewer import WordsErrorReviewer
+from .core.prompt_builder import WordsPromptBuilder
+
+__version__ = "0.1.0"
+__all__ = [
+    "WordsErrorReviewer",
+    "WordsPromptBuilder",
+]
+

+ 39 - 0
core/base/words_detect/config/config.yaml

@@ -0,0 +1,39 @@
+# ============================================================
+# LLM 调用配置
+# ============================================================
+
+llm_api:
+  # API 类型:'openai' 使用 OpenAI 客户端库,'requests' 使用 requests 库(默认)
+  # 推荐使用 'openai' 以获得更好的兼容性和功能支持(如 ModelScope)
+  # api_type: "requests"  # 可选值: "openai" 或 "requests"(推荐使用 "openai" 以获得更好的兼容性)
+
+  # 标准 OpenAI 兼容 API 配置示例:
+  # api_type: "requests"
+  # base_url: "http://172.16.35.50:8000/v1"
+  # model: "Qwen3-4B"
+  # api_key: null
+
+  # ModelScope API 配置(当前使用):
+  # api_type: "openai"
+  # base_url: "https://api-inference.modelscope.cn/v1"
+  # api_key: "ms-8aa2f24b-2560-41c4-b3ba-f75de0249871"  # ModelScope Token
+  # model: "Qwen/Qwen3-8B"
+
+  # 路桥服务器
+  api_type: "requests"
+  base_url: "http://192.168.91.253:9002/v1"
+  model: "Qwen3-8B"
+  api_key: 'dummy'
+
+  timeout: 60  # 请求超时时间(秒)
+  stream: false  # 是否使用流式输出
+
+  # extra_body 参数(用于 ModelScope 等特殊功能)
+  # 对于非流式调用,enable_thinking 必须设置为 false
+  extra_body:
+    enable_thinking: false
+  # 如需启用 thinking 功能(仅流式调用时可用):
+  # extra_body:
+  #   enable_thinking: true
+  #   thinking_budget: 4096
+

+ 75 - 0
core/base/words_detect/core/prompt_builder.py

@@ -0,0 +1,75 @@
+"""
+错误识别提示词构建器
+用于将输入文本与模板组合为完整提示词。
+"""
+
+from pathlib import Path
+from typing import Optional
+import yaml
+
+
+class WordsPromptBuilder:
+    """根据错误识别模板构建提示词"""
+
+    def __init__(self, template_path: Optional[str] = None):
+        """
+        初始化提示词构建器
+
+        Args:
+            template_path: 模板文件路径,默认使用 words_detect/错误识别.yaml
+        """
+        if template_path is None:
+            template_path = Path(__file__).parent.parent / "错误识别.yaml"
+
+        with open(template_path, "r", encoding="utf-8") as f:
+            self.template = yaml.safe_load(f)
+
+    def build_prompt(self, content: str) -> str:
+        """
+        构建完整提示词
+
+        Args:
+            content: 待检查的文本块
+
+        Returns:
+            拼装后的提示词文本
+        """
+        content = content.strip()
+
+        # 1) 错误类别定义
+        prompt_parts = []
+        standards = self.template.get("classification_standards", {})
+        if standards:
+            prompt_parts.append("【错误类别定义】")
+            for name, desc in standards.items():
+                prompt_parts.append(f"{name}: {desc}")
+            prompt_parts.append("")
+
+        # 2) 待检查文本
+        file_toc_tpl = self.template.get("file_toc", "")
+        if "<content>" in file_toc_tpl:
+            file_toc = file_toc_tpl.replace("<content>", content)
+        else:
+            file_toc = f"{file_toc_tpl.strip()}\n{content}"
+        prompt_parts.append(file_toc.strip())
+        prompt_parts.append("")
+
+        # 3) 检查标准
+        review_standard = self.template.get("review_standard", "")
+        if review_standard:
+            prompt_parts.append(review_standard.strip())
+            prompt_parts.append("")
+
+        # 4) 检查要求
+        review_requirements = self.template.get("review_requirements", "")
+        if review_requirements:
+            prompt_parts.append(review_requirements.strip())
+            prompt_parts.append("")
+
+        # 5) 输出格式
+        output_format = self.template.get("output_format", "")
+        if output_format:
+            prompt_parts.append(output_format.strip())
+
+        return "\n".join(prompt_parts)
+

+ 98 - 0
core/base/words_detect/core/reviewer.py

@@ -0,0 +1,98 @@
+"""
+错误识别审查器
+直接使用提示词与文本块调用一次 LLM API。
+"""
+
+import ast
+import json
+import re
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+from detect.core.llm_client import LLMClient
+from .prompt_builder import WordsPromptBuilder
+
+
+class WordsErrorReviewer:
+    """错别字/重复字审查器(单次 LLM 调用)"""
+
+    def __init__(
+        self,
+        template_path: Optional[str] = None,
+        config_path: Optional[str] = None,
+    ):
+        """
+        初始化审查器
+
+        Args:
+            template_path: 提示词模板路径,默认使用 words_detect/错误识别.yaml
+            config_path: LLM 配置路径,默认使用 words_detect/config/config.yaml
+        """
+        if template_path is None:
+            template_path = Path(__file__).parent.parent / "错误识别.yaml"
+        if config_path is None:
+            config_path = Path(__file__).parent.parent / "config" / "config.yaml"
+
+        self.prompt_builder = WordsPromptBuilder(template_path)
+        self.llm_client = LLMClient(config_path)
+
+    def review(self, content: str) -> Dict[str, Any]:
+        """
+        执行文本错误审查(单次 API 调用)
+
+        Args:
+            content: 待检查文本块
+
+        Returns:
+            {
+              "prompt": 构建的提示词,
+              "result": LLM 原始返回文本,
+              "parsed_result": 解析得到的 JSON(若解析失败则为 None)
+            }
+        """
+        prompt = self.prompt_builder.build_prompt(content)
+        result_text = self.llm_client.review(prompt)
+        parsed = self._extract_json_from_text(result_text)
+
+        return {
+            "prompt": prompt,
+            "result": result_text,
+            "parsed_result": parsed,
+        }
+
+    def _extract_json_from_text(self, text: str) -> Optional[Any]:
+        """
+        从 LLM 返回文本中提取 JSON 对象
+        """
+        if not text:
+            return None
+
+        candidates = []
+
+        # 1) 直接尝试整体解析
+        stripped = text.strip()
+        candidates.append(stripped)
+
+        # 2) ```json ... ``` 代码块
+        fence_pattern = r"```(?:json)?\s*(\{[\s\S]*?\})\s*```"
+        match = re.search(fence_pattern, text, re.DOTALL)
+        if match:
+            candidates.append(match.group(1))
+
+        # 3) 最外层花括号或方括号截取(支持列表/对象)
+        container_pattern = r"(\{[\s\S]*\}|\[[\s\S]*\])"
+        match = re.search(container_pattern, text)
+        if match:
+            candidates.append(match.group(1))
+
+        for candidate in candidates:
+            try:
+                return json.loads(candidate)
+            except json.JSONDecodeError:
+                # 兼容 Python 字面量格式 (列表/元组)
+                try:
+                    return ast.literal_eval(candidate)
+                except Exception:
+                    continue
+        return None
+

+ 65 - 0
core/base/words_detect/examples/example.py

@@ -0,0 +1,65 @@
+"""
+使用示例:错别字/重复字审查
+"""
+
+import json
+from words_detect import WordsErrorReviewer
+
+
+example_text = """
+2、项目主要管理人员环保职责
+  表10 项目主要管理人员环水保职责一览表
+
+  序号
+  职务
+  主要环保职责
+  备注
+  1
+  项目经理/项目书记
+  项目环保工作的第一责任人,对项目环保工作负总责。
+  3
+  总工程师
+  对项目环保科技创新和技术管理工作负直接领导责任;编制施工组织设计、专项方案时应包含环保技术内容,对方案环保措施进行优化。
+  4
+  副经理
+  对分管工区环保工作负直接领导责任,阻值分管工区开展环保教育培训,解决分管范围内存在的有关环保问题,制定预防和整改措施。
+  5
+  安全总监
+  对项目环保工作负监督管理责任;统筹邪教、监督管理项目环保工作,建立健全项目环保管理制度和体系;组织开展项目环保管理监督、检查工作。
+  6
+  工程管理部处长长
+  组织生产活动同时应统筹考虑防治污染的设施,防治污染的设施应与主体工程同时设计、同时施工、同时投入使用;编制专项施工方案时,应系统考虑污染防治措施,努力实现废水、废渣、废气、噪声等依法合规排放。
+  7
+  安全环保处长
+  认真贯彻执行国家环保方针政策、法律法规,参与拟定环保监督管理规章制度;监督、检查项目各部门、工区依法合规开展环保工作,提出整改意见并监督落实。
+  8
+  物资设备处长
+  负责项目生态环境保护管理工作,贯彻执行国家环保方针政策、法律法规,项目决策部署和工作安排;制定、监督实施项目环保监督管理规章制度、标准、工作计划;负责项目环保监督数据的统计分析和报送工作。
+  3、环水保风险控制要点
+  四川公路桥梁建设集团有限公司镇广C4 项目经理部
+  JQ220t-40m 架桥机安装及拆除专项施工方案第63 页
+"""
+
+
+def main():
+    reviewer = WordsErrorReviewer()
+    result = reviewer.review(example_text)
+
+    print("=" * 60)
+    print("审查提示词:")
+    print("=" * 60)
+    print(result["prompt"])
+
+    print("\n" + "=" * 60)
+    print("模型原始返回:")
+    print("=" * 60)
+    print(result["result"])
+
+    if result["parsed_result"] is not None:
+        print("\n解析后的 JSON:")
+        print(json.dumps(result["parsed_result"], ensure_ascii=False, indent=2))
+
+
+if __name__ == "__main__":
+    main()
+

+ 63 - 0
core/base/words_detect/错误识别.yaml

@@ -0,0 +1,63 @@
+# ============================================================
+# 错误识别提示词模板
+# ============================================================
+
+# 分类标准映射
+# 说明:定义各错误类别的判定标准
+classification_standards:
+  错别字: "实体、动词、名词等概念字词的使用不符合句子语境的错误。"
+  重复字词: "组词时多字(如“工工程简介”多 1 个“工”字),或某词语在一句话中无语义、语法依据重复出现的错误。"
+
+# 组件1: 待检查文本
+# 说明:需要进行错误识别的完整文本内容(运行时将 <content> 替换为实际文本块)
+file_toc: |-
+  【待检查文本】
+  <content>
+
+# 组件2: 检查标准
+# 说明:两类错误的具体识别要求
+review_standard: |-
+  【检查标准】
+  1. 错别字:识别语义、语境、搭配错误导致的错词、错字。
+  2. 重复字词:识别多余的字、重复出现且无语义依据的字、词或词组。
+
+# 组件3: 检查要求
+# 说明:明确如何执行检查和判定
+review_requirements: |-
+  【检查任务要求】
+  
+  ## 检查原则
+  1. 精准识别:只标注确凿的错别字或重复字词,避免过度推测。
+  2. 分类清晰:两类错误分别归入对应字段,不得混淆。
+  3. 去重原则:同一错误实体的同一错误仅标注 1 次。
+  4. 无误返回:若未发现错误,返回空 JSON 对象。
+  
+  ## 检查细致程度
+  - 逐句检查:逐句审查语义与用词是否匹配。
+  - 语境判断:结合上下文判断是否存在错词或多余字词。
+  - 重复确认:确认重复字词是否确为冗余,而非强调或并列。
+  
+  ## 判定标准
+  - 错别字:字词与语境不符或使用错误。
+  - 重复字词:多余字、词或词组导致语句冗余。
+  
+  ## 特别注意
+  - 忽略转义字符带来的问题。
+  - 若无任何错误,返回空 JSON 对象 {}。
+
+# 组件4: 输出格式
+# 说明:规范检查结果的返回格式
+output_format: |-
+  【输出格式规范】
+  
+  ## 格式要求
+  返回列表,每个元素为四元组:
+  [
+    ("错误实体A", "错误类型A", "错误问题A", "正确示例A"),
+    ("错误实体B", "错误类型B", "错误问题B", "正确示例B"),
+  ]
+  
+  ## 注意事项
+  - 错误类型仅可为“错别字”或“重复字词”。
+  - 若无错误,返回空列表 []。
+  /no_think

+ 52 - 41
core/construction_review/component/ai_review_engine.py

@@ -69,7 +69,7 @@ class Stage(Enum):
     """工作流状态"""
     BASIC = {
         'reviewer_type':'basic',
-        'sensitive': 'sensitive_word_check',
+        'grammar': 'grammar_check',
         'semantic': 'semantic_logic_check',
         'completeness': 'completeness_check',
         'timeliness': 'timeliness_check',
@@ -102,14 +102,6 @@ class AIReviewEngine(BaseReviewer):
         self.semaphore = asyncio.Semaphore(max_concurrent_reviews)
         self.milvus_collection = config_handler.get('milvus', 'MILVUS_COLLECTION', 'default')
 
-    def set_review_location_label(self, location_label: str):
-        """
-        设置审查位置标签
-
-        Args:
-            location_label: 位置标签字符串
-        """
-        self.review_location_label = location_label
 
     def _process_review_result(self, result):
         """
@@ -151,7 +143,7 @@ class AIReviewEngine(BaseReviewer):
         return wrapped_check()
 
     async def basic_compliance_check(self, trace_id_idx: str, unit_content: Dict[str, Any],
-                                   stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
+                                   review_location_label: str = None) -> Dict[str, Any]:
         """
         基础合规性检查
 
@@ -172,12 +164,12 @@ class AIReviewEngine(BaseReviewer):
 
         async def check_with_semaphore(check_func, *args):
             async with self.semaphore:
-                return await check_func(*args, stage_name=stage_name, state=state, current_progress=current_progress)
+                return await check_func(*args)
 
         basic_tasks = [
-            check_with_semaphore(self.check_grammar, trace_id_idx, review_content, review_references),
-            check_with_semaphore(self.check_semantic_logic, trace_id_idx, review_content, review_references),
-            check_with_semaphore(self.check_completeness, trace_id_idx, review_content, review_references)
+            check_with_semaphore(self.check_grammar, trace_id_idx, review_content, review_references,review_location_label),
+            check_with_semaphore(self.check_semantic_logic, trace_id_idx, review_content, review_references,review_location_label),
+            check_with_semaphore(self.check_completeness, trace_id_idx, review_content, review_references,review_location_label)
         ]
 
 
@@ -196,9 +188,9 @@ class AIReviewEngine(BaseReviewer):
         }
 
     async def technical_compliance_check(self, trace_id_idx: str, unit_content: Dict[str, Any],
-                                      stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
+                                      review_location_label: str = None) -> Dict[str, Any]:
         """
-        技术性合规检查
+        技术性合规检查(包含RAG增强审查)
 
         Args:
             trace_id_idx: 追踪ID索引
@@ -211,16 +203,22 @@ class AIReviewEngine(BaseReviewer):
             Dict[str, Any]: 技术性合规检查结果
         """
         review_content = unit_content['content']
-        review_references = unit_content.get('review_references')
         logger.info(f"开始技术性合规检查,内容长度: {len(review_content)}")
+
+        # 先执行RAG增强检索,获取相关标准文档作为参考
+        rag_result = self.rag_enhanced_check(unit_content)
+        review_references = rag_result.get('text_content', '')
+        bfp_file_name = rag_result.get('file_name', '')
+
         async def check_with_semaphore(check_func, *args):
             async with self.semaphore:
-                return await check_func(*args, stage_name=stage_name, state=state, current_progress=current_progress)
+                return await check_func(*args)
 
+        # 并发执行技术性检查任务,传入RAG增强的参考上下文和bfp_file_name
         technical_tasks = [
-            check_with_semaphore(self.check_mandatory_standards, trace_id_idx, review_content,review_references),
-            check_with_semaphore(self.check_design_values, trace_id_idx, review_content,review_references),
-            check_with_semaphore(self.check_technical_parameters, trace_id_idx, review_content,review_references)
+            check_with_semaphore(self.check_mandatory_standards, trace_id_idx, review_content, review_references, bfp_file_name,review_location_label),
+            check_with_semaphore(self.check_design_values, trace_id_idx, review_content, review_references, bfp_file_name,review_location_label),
+            check_with_semaphore(self.check_technical_parameters, trace_id_idx, review_content, review_references, bfp_file_name,review_location_label)
         ]
 
         mandatory_result, design_value_result, technical_param_result = await asyncio.gather(*technical_tasks, return_exceptions=True)
@@ -237,7 +235,7 @@ class AIReviewEngine(BaseReviewer):
             'overall_score': self._calculate_technical_score(mandatory_result, design_value_result, technical_param_result)
         }
 
-    async def rag_enhanced_check(self, unit_content: Dict[str, Any]) -> Dict[str, Any]:
+    def rag_enhanced_check(self, unit_content: Dict[str, Any]) -> Dict[str, Any]:
         """
         RAG增强审查
 
@@ -249,15 +247,30 @@ class AIReviewEngine(BaseReviewer):
         """
         # 向量检索
         vector_results = retrieval_manager.multi_stage_recall(self.milvus_collection, unit_content['content'])
-        vector_results[0]
-        return {
-            'vector_search': vector_results,
 
+        # 检查是否有检索结果
+        if not vector_results:
+            logger.warning("RAG检索未返回任何结果")
+            return {
+                'vector_search': [],
+                'retrieval_status': 'no_results'
+            }
+
+        # 获取第一个结果的信息
+        first_result = vector_results[0]
+        file_name = first_result['metadata'].get('file_name', 'unknown')
+        text_content = first_result['text_content']
+
+        return {
+        'file_name': file_name,
+        'text_content': text_content,
+        'metadata': first_result['metadata']
         }
+        
 
 
     async def check_grammar(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
-                          stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
+                          review_location_label: str = None) -> Dict[str, Any]:
         """
         词句语法检查
 
@@ -273,13 +286,13 @@ class AIReviewEngine(BaseReviewer):
             Dict[str, Any]: 语法检查结果
         """
         reviewer_type = Stage.BASIC.value['reviewer_type']
-        prompt_name = Stage.BASIC.value['sensitive']
+        prompt_name = Stage.BASIC.value['grammar']
         trace_id = prompt_name+trace_id_idx
         return await self.review("词句语法检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
-                               stage_name, state, current_progress)
+                               None, review_location_label)
 
     async def check_semantic_logic(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
-                                 stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
+                                 review_location_label: str = None) -> Dict[str, Any]:
         """
         语义逻辑检查
 
@@ -287,9 +300,7 @@ class AIReviewEngine(BaseReviewer):
             trace_id_idx: 追踪ID索引
             review_content: 审查内容
             review_references: 审查参考信息
-            stage_name: 阶段名称
-            state: 状态字典
-            current_progress: 当前进度
+            review_location_label: 审查位置标签
 
         Returns:
             Dict[str, Any]: 语义逻辑检查结果
@@ -298,10 +309,10 @@ class AIReviewEngine(BaseReviewer):
         prompt_name = Stage.BASIC.value['semantic']
         trace_id = prompt_name+trace_id_idx
         return await self.review("语义逻辑检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
-                               stage_name, state, current_progress)
+                               None, review_location_label)
 
     async def check_completeness(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
-                               stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
+                               review_location_label: str = None) -> Dict[str, Any]:
         """
         完整性检查
 
@@ -320,10 +331,10 @@ class AIReviewEngine(BaseReviewer):
         prompt_name = Stage.BASIC.value['completeness']
         trace_id = prompt_name+trace_id_idx
         return await self.review("完整性检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
-                               stage_name, state, current_progress)
+                               None, review_location_label)
 
     async def check_mandatory_standards(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
-                                        stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
+                                        bfp_file_name: str = None, review_location_label: str = None) -> Dict[str, Any]:
         """
         强制性标准检查
 
@@ -342,10 +353,10 @@ class AIReviewEngine(BaseReviewer):
         prompt_name = Stage.TECHNICAL.value['mandatory']
         trace_id = prompt_name+trace_id_idx
         return await self.review("强制性标准检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
-                               stage_name, state, current_progress)
+                               bfp_file_name, review_location_label)
 
     async def check_design_values(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
-                                  stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
+                                  bfp_file_name: str = None, review_location_label: str = None) -> Dict[str, Any]:
         """
         设计值检查
 
@@ -364,10 +375,10 @@ class AIReviewEngine(BaseReviewer):
         prompt_name = Stage.TECHNICAL.value['design']
         trace_id = prompt_name+trace_id_idx
         return await self.review("设计值检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
-                               stage_name, state, current_progress)
+                               bfp_file_name, review_location_label)
 
     async def check_technical_parameters(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
-                                         stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
+                                         bfp_file_name: str = None, review_location_label: str = None) -> Dict[str, Any]:
         """
         技术参数检查
 
@@ -386,7 +397,7 @@ class AIReviewEngine(BaseReviewer):
         prompt_name = Stage.TECHNICAL.value['technical']
         trace_id = prompt_name+trace_id_idx
         return await self.review("技术参数检查", trace_id, reviewer_type, prompt_name, review_content, review_references,
-                               stage_name, state, current_progress)
+                               bfp_file_name, review_location_label)
 
     # RAG检索增强
 

+ 3 - 1
core/construction_review/component/document_processor.py

@@ -5,6 +5,7 @@
 """
 
 import io
+import json
 import os
 import tempfile
 from pathlib import Path
@@ -512,7 +513,8 @@ class DocumentProcessor:
                     result['toc_info'] = raw_content['toc_info']
                 if 'classification' in raw_content:
                     result['classification'] = raw_content['classification']
-
+            with open(rf"temp\document_temp\文档切分预处理结果.json", 'w', encoding='utf-8') as f:
+                json.dump(result, f, ensure_ascii=False, indent=4)
             return result
 
         except Exception as e:

+ 21 - 29
core/construction_review/component/reviewers/base_reviewer.py

@@ -33,10 +33,11 @@ class BaseReviewer(ABC):
         self.model_client = generate_model_client
         self.prompt_loader = prompt_loader
         self.review_location_label = review_location_label
+        self.bfp_file_name = ""
     
     #@obverse
     async def review(self, name: str, trace_id: str, reviewer_type: str, prompt_name: str, review_content: str, review_references: str = None,
-                    stage_name: str = None, state: dict = None, current_progress: int = None) -> ReviewResult:
+                    bfp_file_name: str = None, review_location_label: str = None) -> ReviewResult:
         """
         执行审查
 
@@ -60,25 +61,16 @@ class BaseReviewer(ABC):
         """
         start_time = time.time()
         try:
-            # 添加进度更新
-            # 安全检查:确保所有必要参数都存在才执行进度更新
-            # if state and state.get("progress_manager") and stage_name and current_progress is not None:
-            #     asyncio.create_task(
-            #         state["progress_manager"].update_stage_progress(
-            #             callback_task_id=state["callback_task_id"],
-            #             stage_name=stage_name,
-            #             current=current_progress,
-            #             status="processing",
-            #             message=f"开始进行:{name}"
-            #         )
-            #     )
             logger.info(f"开始执行 {name} 审查,trace_id: {trace_id},内容长度: {len(review_content)}")
             prompt_kwargs = {}
             prompt_kwargs["review_content"] = review_content
             prompt_kwargs["review_references"] = review_references or ""
             # 添加审查位置标签到提示词参数中
-            if hasattr(self, 'review_location_label') and self.review_location_label:
-                prompt_kwargs["review_location_label"] = self.review_location_label
+            if review_location_label:
+                prompt_kwargs["review_location_label"] = review_location_label
+            # 添加bfp_file_name到提示词参数中
+            if bfp_file_name:
+                prompt_kwargs["bfp_file_name"] = bfp_file_name
             task_prompt_info = {
                 "task_prompt": self.prompt_loader.get_prompt_template(reviewer_type, prompt_name, **prompt_kwargs),
                 "task_name": name
@@ -104,20 +96,20 @@ class BaseReviewer(ABC):
                 'timestamp': time.time()
             }
 
-            # 推送审查完成信息
-            if state and state.get("progress_manager"):
-                # 使用专门的事件类型,避免覆盖主进度的processing_flag
-                asyncio.create_task(
-                    state["progress_manager"].update_stage_progress(
-                        callback_task_id=state["callback_task_id"],
-                        stage_name=stage_name,
-                        current=None,  # 明确不更新current,保持主流程进度
-                        status="processing",
-                        message=f"{name} 审查完成, 耗时: {result.execution_time:.2f}s",
-                        issues=[review_result_data],
-                        event_type="processing"  # 使用专门的事件类型
-                    )
-                )
+            # # 推送审查完成信息
+            # if state and state.get("progress_manager"):
+            #     # 使用专门的事件类型,避免覆盖主进度的processing_flag
+            #     asyncio.create_task(
+            #         state["progress_manager"].update_stage_progress(
+            #             callback_task_id=state["callback_task_id"],
+            #             stage_name=stage_name,
+            #             current=None,  # 明确不更新current,保持主流程进度
+            #             status="processing",
+            #             message=f"{name} 审查完成, 耗时: {result.execution_time:.2f}s",
+            #             issues=[review_result_data],
+            #             event_type="processing"  # 使用专门的事件类型
+            #         )
+            #     )
             logger.info(f"{name} 审查完成, 耗时: {result.execution_time:.2f}s")
 
             return result

+ 63 - 1
core/construction_review/component/reviewers/prompt/basic_reviewers.yaml

@@ -1,6 +1,68 @@
 # 基础合规性审查提示词配置
 
-# 1.1 语义逻辑检查功能
+# 1.1 词句语法检查功能
+grammar_check:
+  system_prompt: |
+    你是施工方案词句语法审查专家,负责检查文本中的错别字和重复字词错误。
+
+    # 分类标准映射
+    classification_standards:
+      错别字: "实体、动词、名词等概念字词的使用不符合句子语境的错误。"
+      重复字词: "组词时多字(如"工工程简介"多 1 个"工"字),或某词语在一句话中无语义、语法依据重复出现的错误。"
+
+    # 检查标准
+    review_standard: 
+      【检查标准】
+      1. 错别字:识别语义、语境、搭配错误导致的错词、错字。
+      2. 重复字词:识别多余的字、重复出现且无语义依据的字、词或词组。
+
+    # 检查要求
+    review_requirements: 
+      【检查任务要求】
+
+      ## 检查原则
+      1. 精准识别:只标注确凿的错别字或重复字词,避免过度推测。
+      2. 分类清晰:两类错误分别归入对应字段,不得混淆。
+      3. 去重原则:同一错误实体的同一错误仅标注 1 次。
+
+      ## 检查细致程度
+      - 逐句检查:逐句审查语义与用词是否匹配。
+      - 语境判断:结合上下文判断是否存在错词或多余字词。
+      - 重复确认:确认重复字词是否确为冗余,而非强调或并列。
+
+      ## 判定标准
+      - 错别字:字词与语境不符或使用错误。
+      - 重复字词:多余字、词或词组导致语句冗余。
+
+      ## 风险等级分类
+      - 高风险:影响审查结论、可能导致法律问题或严重安全隐患
+      - 中风险:影响专业表达、可能导致理解偏差或一般性问题
+      - 低风险:形式问题、不影响实质内容和安全
+
+    审查参考:
+    {review_references}
+
+  user_prompt_template: |
+    请审查以下内容的词句语法错误,重点关注错别字和重复字词:
+
+    【待检查文本】
+    {review_content}
+
+    输出格式:务必须严格按照以下标准JSON格式输出审查结果:
+    如果未发现问题,请输出:无明显问题
+    如果发现问题,请按以下格式输出:
+    location字段直接输出原字段内容,不得猜测
+    ```json
+    {{
+      "issue_point": "问题标题描述",
+      "location": "{review_location_label}",
+      "suggestion": "具体的修改建议内容",
+      "reason": "问题的原因分析和依据说明",
+      "risk_level": ""
+    }}
+    ```
+
+# 1.2 语义逻辑检查功能
 semantic_logic_check:
   system_prompt: |
     你是施工方案语义逻辑审查专家,负责检查表述一致性和逻辑清晰度。

+ 3 - 0
core/construction_review/component/reviewers/prompt/technical_reviewers.yaml

@@ -31,6 +31,7 @@ mandatory_standards_check:
       "issue_point": "问题标题描述",
       "location": "{review_location_label}",
       "suggestion": "具体的修改建议内容",
+      "reference_source": "{bfp_file_name}",
       "reason": "问题的原因分析和依据说明",
       "risk_level": ""
     }}
@@ -67,6 +68,7 @@ technical_parameters_check:
       "issue_point": "问题标题描述",
       "location": "{review_location_label}",
       "suggestion": "具体的修改建议内容",
+      "reference_source": "{bfp_file_name}",
       "reason": "问题的原因分析和依据说明",
       "risk_level": ""
     }}
@@ -103,6 +105,7 @@ design_values_check:
       "issue_point": "问题标题描述",
       "location": "{review_location_label}",
       "suggestion": "具体的修改建议内容",
+      "reference_source": "{bfp_file_name}",
       "reason": "问题的原因分析和依据说明",
       "risk_level": ""
     }}

+ 3 - 2
core/construction_review/component/reviewers/utils/__init__.py

@@ -4,7 +4,8 @@
 """
 
 from .prompt_loader import PromptLoader
-
+from .inter_tool import InterTool
 __all__ = [
-    'PromptLoader'
+    'PromptLoader',
+    'InterTool'
 ]

+ 376 - 0
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -0,0 +1,376 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+InterTool 类 - AI 审查工具类
+
+负责 AI 审查结果的辅助计算和结果处理功能。
+"""
+
+import json
+import re
+from typing import Optional, Dict, Any, List, TypedDict
+from langchain_core.messages import BaseMessage
+from foundation.observability.logger.loggering import server_logger as logger
+
+# 常量定义
+RISK_LEVELS = {"high": "高风险", "medium": "中风险", "low": "低风险"}
+DEFAULT_RISK_LEVEL = "medium"
+
+
+class AIReviewState(TypedDict):
+    """AI审查工作流状态"""
+
+    file_id: str
+    callback_task_id: str
+    file_name: str
+    user_id: str
+    structured_content: Dict[str, Any]
+    review_results: Optional[Dict[str, Any]]
+    current_stage: str
+    status: str
+    error_message: Optional[str]
+    progress_manager: Optional[Any]
+    # 消息日志(用于LangGraph状态追踪)
+    messages: List[BaseMessage]
+
+
+class InterTool:
+    """AI审查工具类 - 负责辅助计算和结果处理"""
+
+    def _calculate_overall_risk(self, basic_result: Dict, technical_result: Dict, rag_result: Dict) -> str:
+        """
+        计算总体风险等级
+
+        Args:
+            basic_result: 基础合规性审查结果,包含overall_score字段
+            technical_result: 技术性审查结果,包含overall_score字段
+            rag_result: RAG增强审查结果(当前未使用)
+
+        Returns:
+            str: 风险等级 ("low", "medium", "high")
+
+        Note:
+            风险等级计算逻辑:
+            - low: 基础和技术审查都达到90分以上
+            - medium: 基础和技术审查都达到70分以上
+            - high: 其他情况
+            异常情况下返回默认风险等级medium
+        """
+        try:
+            # 基于各种审查结果计算风险等级
+            # 风险等级计算逻辑:基础和技术审查都达到90分以上为低风险,70分以上为中风险,否则为高风险
+            basic_score = basic_result.get('overall_score', 0)
+            technical_score = technical_result.get('overall_score', 0)
+
+            if basic_score >= 90 and technical_score >= 90:
+                return "low"
+            elif basic_score >= 70 and technical_score >= 70:
+                return "medium"
+            else:
+                return "high"
+        except (KeyError, TypeError, ValueError) as e:
+            logger.warning(f"风险等级计算异常: {str(e)},使用默认风险等级")
+            return DEFAULT_RISK_LEVEL
+
+    def _aggregate_results(self, successful_results: List[List[Dict[str, Any]]]) -> Dict[str, Any]:
+        """
+        汇总审查结果(issues格式)
+
+        Args:
+            successful_results: 成功的审查结果列表(issues格式),每个单元返回一个issues列表
+
+        Returns:
+            Dict[str, Any]: 汇总后的统计信息,包含以下字段:
+                - risk_stats: 各风险等级的数量统计 {"low": 0, "medium": 0, "high": 0}
+                - total_reviewed: 成功审查的总数量
+                - total_issues: 总问题数量
+
+        Note:
+            当输入为空时返回空字典,异常时记录错误并返回空字典
+        """
+        try:
+            if not successful_results:
+                return {}
+
+            # 计算风险等级统计和问题总数
+            risk_stats = {"low": 0, "medium": 0, "high": 0}
+            total_issues = 0
+
+            for unit_issues in successful_results:
+                # 每个unit_issues是一个issues列表
+                if unit_issues and isinstance(unit_issues, list):
+                    total_issues += len(unit_issues)
+
+                    # 统计每个issue中的风险等级
+                    for issue in unit_issues:
+                        if isinstance(issue, dict):
+                            # issue格式: {issue_id: {risk_summary: {...}}}
+                            for issue_data in issue.values():
+                                risk_summary = issue_data.get('risk_summary', {})
+                                max_risk = risk_summary.get('max_risk_level', '0')
+
+                                if max_risk in risk_stats:
+                                    risk_stats[max_risk] += 1
+                                elif max_risk == '0':
+                                    risk_stats['low'] += 1  # 无风险视为低风险
+
+            return {
+                'risk_stats': risk_stats,
+                'total_reviewed': len(successful_results),
+                'total_issues': total_issues
+            }
+        except (ZeroDivisionError, KeyError, TypeError) as e:
+            logger.error(f"结果汇总失败: {str(e)}")
+            return {}
+
+    def _format_review_results_to_issues(self, callback_task_id: str, unit_index: int, review_location_label: str,
+                                        unit_content: Dict[str, Any], basic_result: Dict[str, Any],
+                                        technical_result: Dict[str, Any]) -> List[Dict[str, Any]]:
+        """
+        将审查结果格式化为issues结构
+
+        Args:
+            callback_task_id: 回调任务ID,用于生成唯一issue_id
+            unit_index: 单元索引,用于生成唯一issue_id
+            review_location_label: 审查位置标签,如"第3页:第一章"
+            unit_content: 单元内容,包含原始文本等信息
+            basic_result: 基础合规性审查结果,包含各项检查结果
+            technical_result: 技术性审查结果,包含技术标准检查结果
+
+        Returns:
+            List[Dict]: 格式化后的issues列表,每个issue包含:
+                - issue_id: 唯一标识符,格式为"{callback_task_id}-{risk_level}-{unit_index}"
+                - metadata: 元数据,包含审查位置和原始内容
+                - risk_summary: 风险摘要,包含最高风险等级和问题数量统计
+                - review_lists: 详细审查问题列表
+
+        Note:
+            自动跳过overall_score字段,提取所有检查项的详细结果
+            支持风险等级统计和最高风险等级确定
+        """
+        issues = []
+        review_lists = []
+        risk_count = {"high": 0, "medium": 0, "low": 0}
+        max_risk_level = "low"
+
+        # 合并所有审查结果
+        all_results = {}
+        if basic_result:
+            all_results.update(basic_result)
+        if technical_result:
+            all_results.update(technical_result)
+
+        logger.info(f"开始格式化审查结果,合并后结果: {list(all_results.keys())}")
+
+        for check_key, check_result in all_results.items():
+            logger.info(f"处理检查项: {check_key}, 结果类型: {type(check_result)}")
+
+            if check_key == 'overall_score':  # 跳过分数字段
+                logger.info(f"跳过分数字段: {check_key}")
+                continue
+
+            if check_result and "details" in check_result and "response" in check_result["details"]:
+                response = check_result["details"]["response"]
+                check_name = check_result["details"].get("name", check_key)
+                logger.info(f"解析检查项 {check_name} 的响应,长度: {len(response)}")
+                check_issues = self._parse_ai_review_response(response, check_name)
+                review_lists.extend(check_issues)
+            else:
+                logger.warning(f"检查项 {check_key} 格式不符合要求,缺少details或response字段")
+                logger.warning(f"check_result内容: {check_result}")
+
+        # 统计风险等级
+        for issue in review_lists:
+            risk_level = issue.get("risk_info", {}).get("risk_level", "low")
+            if risk_level in risk_count:
+                risk_count[risk_level] += 1
+
+        # 确定最高风险等级
+        if risk_count["high"] > 0:
+            max_risk_level = "high"
+        elif risk_count["medium"] > 0:
+            max_risk_level = "medium"
+
+        # 如果有审查结果,创建issue
+        if review_lists:
+            issue_id = f"{callback_task_id}-{max_risk_level if max_risk_level else '0'}-{unit_index}"
+            issue = {
+                issue_id: {
+                    "metadata": {
+                        "review_location_label": review_location_label,
+                        "original_content": unit_content.get('content', '')
+                    },
+                    "risk_summary": {
+                        "max_risk_level": max_risk_level if max_risk_level else '0',
+                        "risk_count": risk_count if risk_count else 0
+                    },
+                    "review_lists": review_lists
+                }
+            }
+            issues.append(issue)
+
+        return issues
+
+    def _parse_ai_review_response(self, response: str, check_name: str) -> List[Dict[str, Any]]:
+        """
+        解析AI审查的JSON格式响应
+
+        Args:
+            response: AI审查响应内容
+            check_name: 检查项名称(如"词句语法检查")
+
+        Returns:
+            List[Dict]: 解析后的审查结果列表,包含check_item、check_result、exist_issue、risk_info等字段
+
+        Note:
+            支持识别"无明显问题"等关键词,自动设置风险等级
+            解析新的英文键名JSON格式: issue_point, location, suggestion, reason, risk_level
+        """
+        review_lists = []
+
+        try:
+            # 1. 检查无问题情况
+            if any(keyword in response for keyword in ["无明显问题", "无问题", "符合要求"]):
+                return [{
+                    "check_item": check_name,
+                    "check_result": "无明显问题",
+                    "exist_issue": False,
+                    "risk_info": {"risk_level": "low"}
+                }]
+
+            # 2. 尝试提取和解析JSON
+            json_data = self._extract_json_data(response)
+            if json_data:
+                # 处理数组格式 - 保存调试数据
+                if isinstance(json_data, list):
+                    for issue_data in json_data:
+                        review_lists.append(self._create_issue_item(issue_data, check_name))
+                # 处理对象格式
+                elif isinstance(json_data, dict):
+                    review_lists.append(self._create_issue_item(json_data, check_name))
+
+            # 3. 如果JSON解析失败,回退到文本解析
+            if not review_lists:
+                risk_level = self._determine_risk_level(response)
+                review_lists.append({
+                    "check_item": check_name,
+                    "check_result": response,
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": risk_level}
+                })
+
+        except Exception as e:
+            logger.error(f"解析AI审查响应失败: {str(e)}")
+            review_lists.append({
+                "check_item": check_name,
+                "check_result": response,
+                "exist_issue": True,
+                "risk_info": {"risk_level": "low"}
+            })
+
+        return review_lists
+
+    def _extract_json_data(self, response: str):
+        """从响应中提取JSON数据,合并所有提取策略"""
+        try:
+            # 尝试直接解析整个响应
+            response_stripped = response.strip()
+            if ((response_stripped.startswith('{') and response_stripped.endswith('}')) or
+                (response_stripped.startswith('[') and response_stripped.endswith(']'))):
+                try:
+                    return json.loads(response_stripped)
+                except json.JSONDecodeError:
+                    pass
+
+            # 尝试从代码块中提取JSON
+            code_block_patterns = [
+                r'```json\s*(\[[\s\S]*?\]|\{[\s\S]*?\})\s*```',
+                r'```(?:json)?\s*(\{[\s\S]*?\})\s*```',
+                r'```json\s*(.*?)\s*```'
+            ]
+
+            for pattern in code_block_patterns:
+                matches = re.findall(pattern, response, re.DOTALL)
+                if matches:
+                    for match in matches:
+                        try:
+                            return json.loads(match.strip())
+                        except json.JSONDecodeError:
+                            continue
+
+            # 尝试模式匹配提取JSON
+            json_patterns = [
+                r"(\{[\s\S]*\}|\[[\s\S]*\])",  # 容器模式
+                r'\[\s*\{.*?\}\s*\]',           # 数组模式
+                r'\{[^{}]*"issue_point"[^{}]*\}'  # 对象模式
+            ]
+
+            for pattern in json_patterns:
+                matches = re.findall(pattern, response, re.DOTALL)
+                for match in matches:
+                    try:
+                        return json.loads(match)
+                    except json.JSONDecodeError:
+                        continue
+
+        except Exception as e:
+            logger.warning(f"JSON提取失败: {str(e)}")
+
+        return None
+
+    def _create_issue_item(self, issue_data: dict, check_name: str) -> Dict[str, Any]:
+        """创建单个审查问题项"""
+        risk_level = self._determine_risk_level(issue_data.get("risk_level", ""))
+
+        return {
+            "check_item": check_name,
+            "check_result": issue_data,
+            "exist_issue": True,
+            "risk_info": {"risk_level": risk_level}
+        }
+
+    def _determine_risk_level(self, risk_input) -> str:
+        """确定风险等级,支持字符串直接输入或文本解析"""
+        if not risk_input:
+            return "medium"
+
+        # 如果输入的是文本,尝试从中提取风险等级
+        if isinstance(risk_input, str) and len(risk_input) > 50:  # 假设是长文本
+            lines = risk_input.split('\n')
+            for line in lines:
+                line = line.strip()
+                if line.startswith('**风险等级**:') or line.startswith('**risk_level**:'):
+                    if ':' in line:
+                        risk_str = line.split(':', 1)[1].strip()
+                    elif ':' in line:
+                        risk_str = line.split(':', 1)[1].strip()
+                    else:
+                        continue
+                    return self._determine_risk_level(risk_str)
+
+        # 处理直接的风险等级字符串
+        risk_str = risk_input.lower()
+        if "高" in risk_str or "high" in risk_str:
+            return "high"
+        elif "中" in risk_str or "medium" in risk_str:
+            return "medium"
+        else:
+            return "low"
+
+    def _check_ai_review_result(self, state: AIReviewState) -> str:
+        """
+        检查AI审查结果状态
+
+        Args:
+            state: AI审查工作流状态
+
+        Returns:
+            str: 状态标识 ("success" 或 "error")
+
+        Note:
+            根据状态中是否存在错误信息来判断审查是否成功
+        """
+        if state.get("error_message"):
+            return "error"
+        return "success"

+ 51 - 418
core/construction_review/workflows/ai_review_workflow.py

@@ -53,6 +53,7 @@ from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
 from foundation.observability.logger.loggering import server_logger as logger
 from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
 from ..component import AIReviewEngine
+from ..component.reviewers.utils.inter_tool import InterTool
 
 # 常量定义
 DEFAULT_SLICE_START_INDEX = 30
@@ -291,7 +292,7 @@ class AIReviewWorkflow:
             # 构建符合格式的review_results
             review_results = {
                 "callback_task_id": state["callback_task_id"],
-                "file_name": state.get("file_name", ""),  # 从state中获取文件名
+                "file_name": state.get("file_name", ""),  
                 "user_id": state["user_id"],
                 "current": 100,
                 "stage_name": "完整审查结果",
@@ -445,8 +446,41 @@ class AIReviewWorkflow:
 
     def _get_workflow_graph(self):
         """获取工作流图(可视化输出)"""
-        grandalf_graph = self.graph.get_graph()
-        grandalf_graph.print_ascii()
+        try:
+            grandalf_graph = self.graph.get_graph()
+            grandalf_graph.print_ascii()
+            self._save_workflow_graph(grandalf_graph)
+
+        except Exception as e:
+            logger.warning(f"生成工作流图失败: {str(e)}")
+
+    def _save_workflow_graph(self, graph):
+        """保存工作流图到temp文件夹"""
+        try:
+            # 创建temp目录
+            temp_dir = "temp"
+
+            file_path = os.path.join(temp_dir, f"ai_review_workflow_graph.png")
+
+            png_data = graph.draw_mermaid_png()
+
+            # 保存到文件
+            with open(file_path, "wb") as f:
+                f.write(png_data)
+
+            logger.info(f"工作流图已保存到: {file_path}")
+
+        except Exception as e:
+            try:
+                
+                ascii_file = os.path.join("temp", f"ai_review_workflow_graph.txt")
+                with open(ascii_file, 'w', encoding='utf-8') as f:
+                    graph.print_ascii(file=f)
+                logger.info(f"工作流ASCII图已保存到: {ascii_file}")
+            except Exception as ascii_error:
+                logger.warning(f"保存ASCII图也失败: {str(ascii_error)}")
+
+            logger.warning(f"保存工作流图PNG失败: {str(e)}")
  
     async def _get_status(self) -> dict:
         """获取工作流状态"""
@@ -470,8 +504,8 @@ class AIReviewCoreFun:
         self.ai_review_engine = ai_review_engine
         self.max_review_units = max_review_units
         self.review_mode = review_mode
-        # 添加消息推送锁,确保事件顺序发送
         self.message_lock = asyncio.Lock()
+        self.inter_tool = InterTool()
         
 
     async def _execute_concurrent_reviews(self, review_chunks: List[Dict[str, Any]],
@@ -503,7 +537,7 @@ class AIReviewCoreFun:
                         section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
                         logger.info(f"section_label:  {section_label}")
                         # 格式化issues以获取问题数量
-                        issues = self._format_review_results_to_issues(
+                        issues = self.inter_tool._format_review_results_to_issues(
                             state["callback_task_id"],
                             unit_index,
                             f"第{unit_content.get('page', '')}页:{section_label}",
@@ -591,27 +625,14 @@ class AIReviewCoreFun:
             section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
             page = unit_content.get('page', '')
             review_location_label = f"第{page}页:{section_label}"
-
-            # 设置review_location_label到AIReviewEngine
-            self.ai_review_engine.set_review_location_label(review_location_label)
-            logger.info(f"review_location_label:  {review_location_label}")
-            stage_name = f"AI审查:{section_label}"
-
-            # 方法内部进度计算(基于当前处理的单元)
-            current_progress = int((unit_index / total_units) * 100)
-
-            # 构建进度消息
-            #message = f"正在处理第 {unit_index + 1}/{total_units} 个单元: {section_label}"
-            #logger.info(f"开始处理单元: {unit_index + 1}/{total_units} - {section_label}")
-
-            # 并发执行各种原子化审查方法,添加超时控制
+            logger.info(f"test_review_location_label:{trace_id_idx}: {review_location_label}")
             review_tasks = [
                 asyncio.wait_for(
-                    self.ai_review_engine.basic_compliance_check(trace_id_idx, unit_content, stage_name, state, current_progress),
+                    self.ai_review_engine.basic_compliance_check(trace_id_idx, unit_content, review_location_label),
                     timeout=REVIEW_TIMEOUT
                 ),
                 asyncio.wait_for(
-                    self.ai_review_engine.technical_compliance_check(trace_id_idx, unit_content, stage_name, state, current_progress),
+                    self.ai_review_engine.technical_compliance_check(trace_id_idx, unit_content,review_location_label),
                     timeout=REVIEW_TIMEOUT
                 ),
             ]
@@ -808,268 +829,6 @@ class AIReviewCoreFun:
             except:
                 return 0
 
-    def _format_review_results_to_issues(self,callback_task_id: str, unit_index: int, review_location_label: str,
-                                        unit_content: Dict[str, Any], basic_result: Dict[str, Any],
-                                        technical_result: Dict[str, Any]) -> List[Dict[str, Any]]:
-        """
-        将审查结果格式化为issues结构
-
-        Args:
-            callback_task_id: 回调任务ID,用于生成唯一issue_id
-            unit_index: 单元索引,用于生成唯一issue_id
-            review_location_label: 审查位置标签,如"第3页:第一章"
-            unit_content: 单元内容,包含原始文本等信息
-            basic_result: 基础合规性审查结果,包含各项检查结果
-            technical_result: 技术性审查结果,包含技术标准检查结果
-
-        Returns:
-            List[Dict]: 格式化后的issues列表,每个issue包含:
-                - issue_id: 唯一标识符,格式为"{callback_task_id}-{risk_level}-{unit_index}"
-                - metadata: 元数据,包含审查位置和原始内容
-                - risk_summary: 风险摘要,包含最高风险等级和问题数量统计
-                - review_lists: 详细审查问题列表
-
-        Note:
-            自动跳过overall_score字段,提取所有检查项的详细结果
-            支持风险等级统计和最高风险等级确定
-        """
-        issues = []
-        review_lists = []
-        risk_count = {"high": 0, "medium": 0, "low": 0}
-        max_risk_level = "low"
-
-        # 合并所有审查结果
-        all_results = {}
-        if basic_result:
-            all_results.update(basic_result)
-        if technical_result:
-            all_results.update(technical_result)
-
-        logger.info(f"开始格式化审查结果,合并后结果: {list(all_results.keys())}")
-
-        for check_key, check_result in all_results.items():
-            logger.info(f"处理检查项: {check_key}, 结果类型: {type(check_result)}")
-
-            if check_key == 'overall_score':  # 跳过分数字段
-                logger.info(f"跳过分数字段: {check_key}")
-                continue
-
-            if check_result and "details" in check_result and "response" in check_result["details"]:
-                response = check_result["details"]["response"]
-                check_name = check_result["details"].get("name", check_key)
-                logger.info(f"解析检查项 {check_name} 的响应,长度: {len(response)}")
-                check_issues = self._parse_ai_review_response(response, check_name)
-                review_lists.extend(check_issues)
-            else:
-                logger.warning(f"检查项 {check_key} 格式不符合要求,缺少details或response字段")
-                logger.warning(f"check_result内容: {check_result}")
-
-        # 统计风险等级
-        for issue in review_lists:
-            risk_level = issue.get("risk_info", {}).get("risk_level", "low")
-            if risk_level in risk_count:
-                risk_count[risk_level] += 1
-
-        # 确定最高风险等级
-        if risk_count["high"] > 0:
-            max_risk_level = "high"
-        elif risk_count["medium"] > 0:
-            max_risk_level = "medium"
-
-        # 如果有审查结果,创建issue
-        if review_lists:
-            issue_id = f"{callback_task_id}-{max_risk_level if max_risk_level else '0'}-{unit_index}"
-            issue = {
-                issue_id: {
-                    "metadata": {
-                        "review_location_label": review_location_label,
-                        "original_content": unit_content.get('content', '')
-                    },
-                    "risk_summary": {
-                        "max_risk_level": max_risk_level if max_risk_level else '0',
-                        "risk_count": risk_count if risk_count else 0
-                    },
-                    "review_lists": review_lists
-                }
-            }
-            issues.append(issue)
-
-        return issues
-
-    def _parse_ai_review_response(self,response: str, check_name: str) -> List[Dict[str, Any]]:
-        """
-        解析AI审查的JSON格式响应
-
-        Args:
-            response: AI审查响应内容
-            check_name: 检查项名称(如"词句语法检查")
-
-        Returns:
-            List[Dict]: 解析后的审查结果列表,包含check_item、check_result、exist_issue、risk_info等字段
-
-        Note:
-            支持识别"无明显问题"等关键词,自动设置风险等级
-            解析新的英文键名JSON格式: issue_point, location, suggestion, reason, risk_level
-        """
-        review_lists = []
-
-        try:
-            if "无明显问题" in response or "无问题" in response or "符合要求" in response:
-                review_lists.append({
-                    "check_item": check_name,
-                    "check_result": "无明显问题",
-                    "exist_issue": False,
-                    "risk_info": {"risk_level": "low"}
-                })
-                return review_lists
-
-            # 尝试解析JSON格式响应
-
-            # 首先检查响应是否直接被{}包裹
-            response_stripped = response.strip()
-            json_data = None
-
-            if (response_stripped.startswith('{') and response_stripped.endswith('}')) or \
-               (response_stripped.startswith('{\n') and response_stripped.endswith('\n}')):
-                # 响应被{}包裹,直接解析
-                try:
-                    json_data = json.loads(response_stripped)
-                except json.JSONDecodeError:
-                    logger.warning(f"直接JSON解析失败,尝试查找JSON代码块")
-
-            # 如果直接解析失败,查找JSON代码块
-            if json_data is None:
-                # 改进的正则表达式:正确处理多行JSON和转义字符
-                json_pattern = r'```json\s*(\[[\s\S]*?\]|\{[\s\S]*?\})\s*```'
-                json_matches = re.findall(json_pattern, response)
-
-                # 如果改进的正则仍然匹配不到,使用更宽松的模式
-                if not json_matches:
-                    json_pattern = r'```json\s*(.*?)\s*```'
-                    json_matches = re.findall(json_pattern, response, re.DOTALL)
-
-            else:
-                json_matches = [response_stripped]  # 使用整个响应作为JSON
-
-            # 如果所有方法都失败,尝试最后的JSON提取策略
-            if not json_matches:
-                logger.warning("标准JSON提取失败,尝试智能JSON提取策略")
-
-                # 策略1: 查找被JSON数组/对象包围的内容
-                array_pattern = r'\[\s*\{.*?\}\s*\]'
-                object_pattern = r'\{[^{}]*"issue_point"[^{}]*\}'
-
-                array_matches = re.findall(array_pattern, response, re.DOTALL)
-                if array_matches:
-                    json_matches = array_matches
-                    logger.info(f"数组模式匹配成功,找到 {len(array_matches)} 个JSON数组")
-                else:
-                    object_matches = re.findall(object_pattern, response, re.DOTALL)
-                    if object_matches:
-                        json_matches = object_matches
-                        logger.info(f"对象模式匹配成功,找到 {len(object_matches)} 个JSON对象")
-
-            # 最后的备选方案:查找包含关键JSON字段的内容
-            if not json_matches:
-                logger.warning("所有JSON提取策略失败,查找包含关键字段的内容")
-                key_pattern = r'[^`]*["\']issue_point["\'][^`]*["\']location["\'][^`]*'
-                key_matches = re.findall(key_pattern, response, re.DOTALL)
-                if key_matches:
-                    # 尝试将匹配内容包装为有效JSON
-                    for match in key_matches:
-                        # 简单包装,可能需要更复杂的逻辑
-                        wrapped_match = f'[{match}]' if not match.strip().startswith('[') else match
-                        json_matches.append(wrapped_match)
-                    logger.info(f"关键字段模式匹配成功,生成 {len(json_matches)} 个JSON候选")
-
-            if json_matches:
-                logger.info(f"成功匹配到 {len(json_matches)} 个JSON代码块")
-                # 解析找到的JSON
-                for i, json_str in enumerate(json_matches):
-                    try:
-                        # 如果是直接解析的JSON,不需要重新解析
-                        if json_str == response_stripped and json_data is not None:
-                            issue_data = json_data
-                            logger.debug(f"使用直接解析的JSON数据 (第{i+1}个)")
-                        else:
-                            # 清理JSON字符串
-                            json_str = json_str.strip()
-                            if not json_str:
-                                logger.warning(f"第{i+1}个JSON代码块为空,跳过")
-                                continue
-
-                            logger.debug(f"尝试解析第{i+1}个JSON: {json_str[:100]}...")
-                            # 解析JSON
-                            issue_data = json.loads(json_str)
-                            logger.info(f"第{i+1}个JSON解析成功,包含 {len(issue_data) if isinstance(issue_data, (list, dict)) else 0} 个元素")
-
-                        risk_level = issue_data.get("risk_level", "")
-
-                        # 确定风险等级
-                        if not risk_level:
-                            risk_level = "medium"  # 默认中等风险
-                        else:
-                            risk_level = risk_level.lower()
-                            if "高" in risk_level or "high" in risk_level:
-                                risk_level = "high"
-                            elif "中" in risk_level or "medium" in risk_level:
-                                risk_level = "medium"
-                            else:
-                                risk_level = "low"
-
-                        # 直接存储JSON字段
-                        review_lists.append({
-                            "check_item": check_name,
-                            "check_result": issue_data,  # 直接存储原始JSON数据
-                            "exist_issue": True,
-                            "risk_info": {"risk_level": risk_level}
-                        })
-
-                    except json.JSONDecodeError as je:
-                        logger.warning(f"JSON解析失败: {str(je)}, JSON内容: {json_str[:100]}...")
-                        # 如果JSON解析失败,回退到文本解析
-                        continue
-
-            # 如果没有成功解析到JSON,使用旧的文本解析方法
-            if not review_lists:
-                lines = response.split('\n')
-                risk_level = "medium"  # 默认中等风险
-
-                # 扫描提取风险等级(支持中英文)
-                for line in lines:
-                    line = line.strip()
-                    if line.startswith('**风险等级**:') or line.startswith('**risk_level**:'):
-                        if ':' in line:
-                            extracted_risk = line.split(':', 1)[1].strip().lower()
-                        elif ':' in line:
-                            extracted_risk = line.split(':', 1)[1].strip().lower()
-
-                        if "高" in extracted_risk or "high" in extracted_risk:
-                            risk_level = "high"
-                        elif "中" in extracted_risk or "medium" in extracted_risk:
-                            risk_level = "medium"
-                        else:
-                            risk_level = "low"
-                        break
-
-                review_lists.append({
-                    "check_item": check_name,
-                    "check_result": response,  # 存储完整响应作为备选
-                    "exist_issue": True,
-                    "risk_info": {"risk_level": risk_level}
-                })
-
-        except Exception as e:
-            logger.error(f"解析AI审查响应失败: {str(e)}")
-            review_lists.append({
-                "check_item": check_name,
-                "check_result": response,
-                "exist_issue": True,
-                "risk_info": {"risk_level": "low"}
-            })
-
-        return review_lists
-
     def _filter_review_units(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
         """
         根据配置筛选要审查的单元
@@ -1081,145 +840,19 @@ class AIReviewCoreFun:
             List[Dict[str, Any]]: 筛选后的审查单元
 
         Note:
-            使用预设的起始索引进行切片,确保不会跳过前面的重要内容
+            根据max_review_units和review_mode配置来筛选审查单元
         """
-        if self.max_review_units is None or self.review_mode == "all":
+        if not self.max_review_units or self.max_review_units >= len(chunks):
+            # 如果没有限制或限制数量大于等于总数,返回所有单元
             return chunks
 
-        # 验证原始chunks不为空
-        if not chunks:
-            logger.warning("没有可用的审查单元")
-            return []
-
-        # 安全的切片操作,考虑边界情况
-        # 使用预设的起始索引进行切片,避免跳过前面的内容
-        start_index = min(DEFAULT_SLICE_START_INDEX, len(chunks) - 1)  # 确保start_index不超过数组边界
-        chunks = chunks[start_index:]
-
-        # 再次验证切片后的结果
-        if not chunks:
-            logger.warning(f"从索引{start_index}切片后没有可用的审查单元")
-            return []
-
-        total_chunks = len(chunks)
-        actual_review_count = min(self.max_review_units, total_chunks)
-
-        logger.info(f"审查模式: {self.review_mode}, 总单元数: {total_chunks}, 实际审查数: {actual_review_count}")
-
         if self.review_mode == "first":
-            # 取前N个
-            return chunks[:actual_review_count]
+            # 返回前N个单元
+            return chunks[:self.max_review_units]
         elif self.review_mode == "random":
-            # 随机取N个
-            return random.sample(chunks, actual_review_count)
+            # 随机选择N个单元
+            return random.sample(chunks, self.max_review_units)
         else:
-            # 默认取前N个
-            return chunks[:actual_review_count]
-
-
-class InterTool:
-    """AI审查工具类 - 负责辅助计算和结果处理"""
-
-    def _calculate_overall_risk(self, basic_result: Dict, technical_result: Dict, rag_result: Dict) -> str:
-        """
-        计算总体风险等级
-
-        Args:
-            basic_result: 基础合规性审查结果,包含overall_score字段
-            technical_result: 技术性审查结果,包含overall_score字段
-            rag_result: RAG增强审查结果(当前未使用)
-
-        Returns:
-            str: 风险等级 ("low", "medium", "high")
-
-        Note:
-            风险等级计算逻辑:
-            - low: 基础和技术审查都达到90分以上
-            - medium: 基础和技术审查都达到70分以上
-            - high: 其他情况
-            异常情况下返回默认风险等级medium
-        """
-        try:
-            # 基于各种审查结果计算风险等级
-            # 风险等级计算逻辑:基础和技术审查都达到90分以上为低风险,70分以上为中风险,否则为高风险
-            basic_score = basic_result.get('overall_score', 0)
-            technical_score = technical_result.get('overall_score', 0)
-
-            if basic_score >= 90 and technical_score >= 90:
-                return "low"
-            elif basic_score >= 70 and technical_score >= 70:
-                return "medium"
-            else:
-                return "high"
-        except (KeyError, TypeError, ValueError) as e:
-            logger.warning(f"风险等级计算异常: {str(e)},使用默认风险等级")
-            return DEFAULT_RISK_LEVEL
-
-    def _aggregate_results(self, successful_results: List[List[Dict[str, Any]]]) -> Dict[str, Any]:
-        """
-        汇总审查结果(issues格式)
-
-        Args:
-            successful_results: 成功的审查结果列表(issues格式),每个单元返回一个issues列表
-
-        Returns:
-            Dict[str, Any]: 汇总后的统计信息,包含以下字段:
-                - risk_stats: 各风险等级的数量统计 {"low": 0, "medium": 0, "high": 0}
-                - total_reviewed: 成功审查的总数量
-                - total_issues: 总问题数量
-
-        Note:
-            当输入为空时返回空字典,异常时记录错误并返回空字典
-        """
-        try:
-            if not successful_results:
-                return {}
-
-            # 计算风险等级统计和问题总数
-            risk_stats = {"low": 0, "medium": 0, "high": 0}
-            total_issues = 0
-
-            for unit_issues in successful_results:
-                # 每个unit_issues是一个issues列表
-                if unit_issues and isinstance(unit_issues, list):
-                    total_issues += len(unit_issues)
-
-                    # 统计每个issue中的风险等级
-                    for issue in unit_issues:
-                        if isinstance(issue, dict):
-                            # issue格式: {issue_id: {risk_summary: {...}}}
-                            for issue_data in issue.values():
-                                risk_summary = issue_data.get('risk_summary', {})
-                                max_risk = risk_summary.get('max_risk_level', '0')
-
-                                if max_risk in risk_stats:
-                                    risk_stats[max_risk] += 1
-                                elif max_risk == '0':
-                                    risk_stats['low'] += 1  # 无风险视为低风险
-
-            return {
-                'risk_stats': risk_stats,
-                'total_reviewed': len(successful_results),
-                'total_issues': total_issues
-            }
-        except (ZeroDivisionError, KeyError, TypeError) as e:
-            logger.error(f"结果汇总失败: {str(e)}")
-            return {}
-
-    def _check_ai_review_result(self, state: AIReviewState) -> str:
-        """
-        检查AI审查结果状态
-
-        Args:
-            state: AI审查工作流状态
-
-        Returns:
-            str: 状态标识 ("success" 或 "error")
-
-        Note:
-            根据状态中是否存在错误信息来判断审查是否成功
-        """
-        if state.get("error_message"):
-            return "error"
-        return "success"
+            # 默认返回前N个单元
+            return chunks[:self.max_review_units]
 

+ 3 - 2
foundation/database/base/sql/__init__.py

@@ -6,8 +6,9 @@ SQL数据库基础组件模块
 
 from .async_mysql_conn_pool import AsyncMySQLPool
 from .async_mysql_base_dao import AsyncBaseDAO
-
+from .bus_data_query import BasisOfPreparationDAO
 __all__ = [
     "AsyncMySQLPool",
-    "AsyncBaseDAO"
+    "AsyncBaseDAO",
+    "BasisOfPreparationDAO"
 ]

+ 36 - 0
foundation/database/base/sql/bus_data_query.py

@@ -0,0 +1,36 @@
+from typing import List, Tuple, Any, Optional, Dict
+from foundation.observability.logger.loggering import server_logger
+from foundation.utils.common import handler_err
+from foundation.database.base.sql.async_mysql_base_dao import AsyncBaseDAO
+
+
+class BasisOfPreparationDAO(AsyncBaseDAO):
+    """异步编制依据 对象"""
+    
+    
+    async def get_info_by_id(self, id: int) -> Optional[Dict]:
+        """根据ID获取编制依据"""
+        query = "SELECT * FROM t_basis_of_preparation WHERE id = %s"
+        return await self.fetch_one(query, (id,))
+    
+    async def get_list(self) -> List[Dict]:
+        """获取所有编制依据"""
+        query = "SELECT * FROM t_basis_of_preparation WHERE status = 'current' ORDER BY created_at DESC"
+        return await self.fetch_all(query)
+    
+
+    async def get_info_by_condition(self, conditions: Dict) -> List[Dict]:
+        """根据条件查询编制依据"""
+        if not conditions:
+            return await self.get_list()
+        
+        try:
+            where_clause = " AND ".join([f"{field} = %s" for field in conditions.keys()])
+            where_values = list(conditions.values())
+            
+            query = f"SELECT * FROM t_basis_of_preparation WHERE {where_clause} AND status = 'current' ORDER BY created_at DESC"
+            return await self.fetch_all(query, tuple(where_values))
+            
+        except Exception as err:
+            handler_err(logger=server_logger, err=err, err_name="条件查询失败")
+            raise

Разлика између датотеке није приказан због своје велике величине
+ 63490 - 0
logs/agent_info.log.1


Разлика између датотеке није приказан због своје велике величине
+ 3 - 5
temp/AI审查结果.json


+ 236 - 0
test/test_rag_enhanced_check.py

@@ -0,0 +1,236 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+测试AI审查引擎的RAG增强检查功能
+"""
+
+import sys
+import os
+import asyncio
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from core.construction_review.component.ai_review_engine import AIReviewEngine
+from foundation.observability.logger.loggering import server_logger as logger
+
+
+async def test_rag_enhanced_check():
+    """
+    测试RAG增强检查功能
+    """
+    try:
+        print("=" * 80)
+        print("开始测试AI审查引擎的RAG增强检查功能")
+        print("=" * 80)
+
+        # 初始化AI审查引擎
+        review_engine = AIReviewEngine()
+
+        # 设置milvus集合名称(从test_rag.py中获取)
+        collection_name = "first_bfp_collection"
+        review_engine.milvus_collection = collection_name
+
+        # 准备测试用的待审查单元内容
+        # 使用与test_rag.py相同的查询内容进行测试
+        test_content = """按照《公路钢结构桥梁制造与安装施工规范》(JTG/T3651-2022)的要求,对圆
+柱头焊钉进行了焊接工艺评定试验,试验共计1 组,焊接9 个剪力钉,其中3 个用来
+做弯曲试验,3 个用来做拉伸试验,3 个备用。施焊参数见表4.4.3。
+表4.4.3 焊钉施焊参数
+编号
+焊接方法
+剪力钉直径
+(mm)
+伸出长度
+(mm)
+提升高度
+(mm)
+焊接时间
+(s)
+焊接电流
+(A)
+YJ-J1
+电弧螺柱焊
+22
+6
+3
+1.0
+1900~2200
+焊缝挤出焊脚均匀饱满,外观质量合格。
+4.4.3 圆柱头焊钉焊接工艺评定试验结果
+按规定对焊钉取3 根进行锤击30º弯曲试验,另外取3 根进行拉伸试验,试验结
+果见表4.4.4,接头力学性能满足技术要求。
+鹿寨至钦州港公路郁江特大桥钢箱梁
+焊接工艺评定总结报告
+42
+表4.4.4 焊钉锤击30º弯曲试验和拉伸试验结果
+规格
+锤击30º弯曲试验
+拉伸试验
+YJ-J1
+Φ22×140+
+Q355D-δ18
+编号
+焊缝状况
+编号
+拉力(KN)
+断口
+部位
+评定标准
+1
+完好
+1
+202.05
+母材
+拉力载荷不
+小于
+159.6KN,断
+口不在焊缝
+或热影响区
+2
+完好
+2
+201.50
+母材
+3
+完好
+3
+201.95
+母材
+鹿寨至钦州港公路郁江特大桥钢箱梁
+焊接工艺评定总结报告
+43
+五、焊接工艺评定试验结果分析
+5.1 焊缝强度
+1.1 验收标准:焊缝屈服强度和抗拉强度不低于母材标准值。
+材质
+板厚范围(mm)
+ReL(MPa)
+Rm(MPa)
+A(%)
+Q355D
+t≤16
+≥355
+≥470
+≥22
+16<t≤40
+≥345
+≥470
+≥22
+1.2 根据表4.1.6 中试验结果整理出各类接头强度散点图如下,所有焊缝的屈服
+强度、抗拉强度均超过了母材标准值。
+5.2 焊缝金属延伸率
+2.1 验收标准:不低于母材标准值。
+2.2 结果分析:根据表4.1.6 中试验结果整理出散点图如下,所有焊缝的延伸率
+均在母材标准要求值之上。
+鹿寨至钦州港公路郁江特大桥钢箱梁
+焊接工艺评定总结报告
+44
+5.3 接头韧性
+3.1 验收标准:根据设计要求,对接接头及熔透角接接头焊缝中心及热影响区焊
+缝V 型缺口低温冲击试验结果的平均值不低于下表的规定值,且任一试验结果不得
+低于0.70 倍的标准值。
+钢材牌号
+试验温度(℃)
+冲击韧性(J)
+Q355D
+-20
+34
+注:板厚≤20mm 的薄板接头冲击功规定值为27J
+3.2 结果分析:从表4.1.6 中试验数据可以看出:所有试验结果中冲击功最小平均
+值为YJ-D2 试件的焊缝中心冲击功47J,大于验收值;单一试件的冲击功最小值为
+YJ-D2(板厚t=18mm)试件的焊缝中心冲击功31J,大于验收值。
+5.4 接头弯曲
+4.1 验收标准:对接接头弯曲180°,试样受拉面上的裂纹总长不大于试样宽度
+的15%,且单个裂纹长度不大于3mm,则判为合格。
+4.2 结果分析:所有对接接头的弯曲试验均完好未产生裂纹。
+5.5 接头硬度
+5.1
+验收标准:不大于HV380。
+5.2
+结果分析:根据表4.1.6 中试验结果整理出接头各区最高硬度曲线图如下,
+所有接头各区的硬度低于HV380。
+鹿寨至钦州港公路郁江特大桥钢箱梁
+焊接工艺评定总结报告
+45
+5.6 接头宏观断面腐蚀
+焊缝均熔合良好,未见焊接缺陷。
+鹿寨至钦州港公路郁江特大桥钢箱梁
+焊接工艺评定总结报告
+46
+六、结论
+从以上试验的焊缝检验和试验结果可以看出:焊缝内、外部质量良好,通过宏观"""
+
+        # 构造unit_content格式
+        unit_content = {
+            'content': test_content,
+            'review_references': ''  # 可以为空或提供参考信息
+        }
+
+        print(f"测试内容长度: {len(test_content)} 字符")
+        print(f"使用的集合名称: {collection_name}")
+
+        # 执行RAG增强检查
+        print("\n开始执行RAG增强检查...")
+        start_time = asyncio.get_event_loop().time()
+
+        result = review_engine.rag_enhanced_check(unit_content)
+
+        end_time = asyncio.get_event_loop().time()
+        elapsed_time = end_time - start_time
+
+        print(f"\nRAG增强检查完成,耗时: {elapsed_time:.2f}秒")
+
+        # 分析结果
+        print("\n" + "=" * 60)
+        print("检查结果分析:")
+        print("=" * 60)
+
+        if 'vector_search' in result:
+            vector_search_results = result['vector_search']
+            print(f"返回结果数量: {len(vector_search_results)}")
+
+            for i, search_result in enumerate(vector_search_results):
+                print(f"\n--- 搜索结果 {i + 1} ---")
+
+                if isinstance(search_result, dict):
+                    # 显示文档内容前100字符
+                    text_content = search_result.get('text_content', '')
+                    print(f"文档内容(前100字符): {text_content[:100]}...")
+
+                    # 显示元数据信息
+                    metadata = search_result.get('metadata', {})
+                    if metadata:
+                        print(f"文件名称: {metadata.get('file_name', 'N/A')}")
+                        print(f"标题: {metadata.get('title', 'N/A')}")
+                        print(f"文件ID: {metadata.get('file_id', 'N/A')}")
+                        print(f"主键: {metadata.get('pk', 'N/A')}")
+                    else:
+                        print("元数据: 无")
+                else:
+                    print(f"结果格式异常: {type(search_result)} - {search_result}")
+
+        else:
+            print("错误: 结果中未找到'vector_search'字段")
+            print(f"完整结果: {result}")
+
+        print("\n" + "=" * 80)
+        print("RAG增强检查测试完成")
+        print("=" * 80)
+
+    except Exception as e:
+        print(f"\n[ERROR] 测试失败: {str(e)}")
+        import traceback
+        print("详细错误信息:")
+        traceback.print_exc()
+
+
+async def main():
+    """
+    主测试函数
+    """
+    await test_rag_enhanced_check()
+
+
+if __name__ == "__main__":
+    print("启动RAG增强检查测试...")
+    asyncio.run(main())

+ 1 - 1
views/test_views.py

@@ -25,7 +25,7 @@ from views import test_router, get_operation_id
 from foundation.ai.agent.workflow.test_workflow_graph import test_workflow_graph
 
 from foundation.database.base.sql.async_mysql_base_dao import TestTabDAO
-from database.repositories.bus_data_query import BasisOfPreparationDAO
+from foundation.database.base.sql import BasisOfPreparationDAO
 from foundation.utils.tool_utils import DateTimeEncoder
 from langchain_core.prompts import ChatPromptTemplate
 from foundation.utils.yaml_utils import system_prompt_config

Неке датотеке нису приказане због велике количине промена