#77 refactor(ai_review_workflow): 代码审查优化与注释规范化

Sapludināts
WangXuMing sapludināja 1 revīzijas no CRBC-MaaS-Platform-Project/dev_sgsc_wxm uz CRBC-MaaS-Platform-Project/dev 4 dienas atpakaļ
1 mainītis faili ar 145 papildinājumiem un 193 dzēšanām
  1. 145 193
      core/construction_review/workflows/ai_review_workflow.py

+ 145 - 193
core/construction_review/workflows/ai_review_workflow.py

@@ -6,7 +6,7 @@
 @File      : ai_review_workflow.py
 @IDE       : VsCode
 @Author    :
-@Date      : 2025-12-01 11:53:02
+@Date      : 2025-04-15
 
 =================================
 
@@ -29,7 +29,7 @@
 ├── _get_workflow_graph()           # 获取工作流图(可视化)
 ├── _save_workflow_graph()          # 保存工作流图到temp文件夹
 ├── _get_status()                   # 获取工作流状态
-└── _dummy_review_task()            # 空任务(方法不存在时使用)
+└── _extract_prep_basis_content()   # 从结构化内容中提取编制依据文本
 
 '''
 
@@ -37,12 +37,10 @@ import asyncio
 import json
 import random
 import re
-from sre_parse import JUMP
 import time
 import os
-from typing import Dict, Tuple, Union, List
+from typing import Dict, Tuple, Union, List, Optional, Callable, Any, TypedDict, Annotated
 from dataclasses import dataclass, asdict
-from typing import Optional, Callable, Dict, Any, TypedDict, Annotated, List
 from langgraph.graph import StateGraph, END
 from langgraph.graph.message import add_messages
 from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
@@ -52,7 +50,6 @@ from foundation.observability.cachefiles import cache, CacheBaseDir
 from core.construction_review.component.reviewers.utils.directory_extraction import (
     extract_basis_with_langchain_qwen,
 )
-from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
 from ..component.ai_review_engine import AIReviewEngine
 from ..component.reviewers.utils.inter_tool import InterTool
 from core.base.task_models import TaskFileInfo
@@ -60,17 +57,6 @@ from .core_functions import AIReviewCoreFun
 from .types import AIReviewState
 
 
-# @dataclass
-# class ReviewResult:
-#     """审查结果"""
-#     unit_index: int
-#     unit_content: Dict[str, Any]
-#     basic_compliance: Dict[str, Any]
-#     technical_compliance: Dict[str, Any]
-#     rag_enhanced: Dict[str, Any]
-#     overall_risk: str
-
-
 class AIReviewWorkflow:
     """基于LangGraph的AI审查工作流"""
 
@@ -109,9 +95,7 @@ class AIReviewWorkflow:
         self.core_fun = AIReviewCoreFun(task_file_info, self.ai_review_engine, max_review_units, review_mode)
         self.inter_tool = InterTool()
 
-        self.max_review_units = max_review_units
         self.max_concurrent = 20 # 规范性与时效性审查最大并发数
-        self.review_mode = review_mode
 
         # 延迟导入 WorkflowManager(避免循环导入)
         from core.base.workflow_manager import WorkflowManager
@@ -135,7 +119,7 @@ class AIReviewWorkflow:
         workflow = StateGraph(AIReviewState)
         workflow.add_node("start", self._start_node)
         workflow.add_node("initialize_progress", self._initialize_progress_node)
-        workflow.add_node("ai_review_check_item",self._ai_review_node_check_item)
+        workflow.add_node("ai_review_check_item", self._ai_review_node_check_item)
         workflow.add_node("save_results", self._save_results_node)  # 添加保存结果节点
         workflow.add_node("complete", self._complete_node)
         workflow.add_node("error_handler", self._error_handler_node)
@@ -175,7 +159,20 @@ class AIReviewWorkflow:
         return self.graph
 
     async def execute(self) -> dict:
-        """执行基于LangGraph的AI审查工作流"""
+        """
+        执行基于LangGraph的AI审查工作流
+
+        Args:
+            无(通过实例属性传递参数)
+
+        Returns:
+            dict: 审查结果,包含 file_id、total_units、successful_units、
+                  failed_units、review_results、summary、status 等字段
+
+        Raises:
+            TimeoutError: 工作流执行超时
+            Exception: 工作流执行失败
+        """
         try:
             logger.info(f"开始AI审查工作流,文件ID: {self.file_id}")
             initial_state = AIReviewState(
@@ -232,7 +229,15 @@ class AIReviewWorkflow:
             raise
 
     async def _start_node(self, state: AIReviewState) -> AIReviewState:
-        """开始节点"""
+        """
+        工作流开始节点
+
+        Args:
+            state: AI审查工作流当前状态
+
+        Returns:
+            AIReviewState: 更新后的工作流状态
+        """
         logger.info(f"AI审查工作流正在启动...: {state['file_id']}")
 
         return {
@@ -242,7 +247,15 @@ class AIReviewWorkflow:
         }
 
     async def _initialize_progress_node(self, state: AIReviewState) -> AIReviewState:
-        """初始化进度节点"""
+        """
+        初始化审查进度节点
+
+        Args:
+            state: AI审查工作流当前状态
+
+        Returns:
+            AIReviewState: 更新后的工作流状态
+        """
         logger.info(f"初始化AI审查进度: {state['file_id']}")
 
         state["current_stage"] = "initialize_progress"
@@ -268,18 +281,26 @@ class AIReviewWorkflow:
     
     async def _ai_review_node_check_item(self, state: AIReviewState) -> AIReviewState:
         """
-        检查当前项是否需要AI审查 - 新版本
+        AI审查核心节点 - 基于review_item_config执行多维度审查
 
-        执行流程:
-        1. 终止信号检查
-        2. 解析审查项配置
-        3. 开始审查
-        5. 汇总结果并构建响应
+        Args:
+            state: AI审查工作流状态
+
+        Returns:
+            AIReviewState: 更新后的工作流状态,包含审查结果
+
+        执行阶段:
+            Phase 1: 终止检查与配置解析
+            Phase 2: 内容获取与预处理
+            Phase 3: 分章节执行审查
+            Phase 4: 目录审查(如配置)
+            Phase 5: 结果汇总
         """
         try:
             logger.info(f"AI审查项检查开始执行,任务ID: {self.task_info.callback_task_id}")
 
-            # 1️ 终止信号检查
+            # ===== Phase 1: 终止检查与配置解析 =====
+            # 1. 终止信号检查
             if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
                 logger.warning(f"AI审查项检查检测到终止信号,任务ID: {state['callback_task_id']}")
                 return {
@@ -288,179 +309,88 @@ class AIReviewWorkflow:
                     "messages": [AIMessage(content="检测到终止信号")]
                 }
 
-            # 2 解析审查项配置
+            # 2. 解析审查项配置
             review_func_mapping: Dict[str, Union[str, List[str]]] = {
                 'sensitive_word_check': 'sensitive_word_check',
                 'semantic_logic_check': 'check_semantic_logic',
                 'completeness_check': 'check_completeness',
-                'timeliness_check': 'timeliness_basis_reviewer',
-                'timeliness_content_check': 'timeliness_content_reviewer',
+                'timeliness_check': 'timeliness_reviewer',  # 统一入口
                 'reference_check': 'reference_basis_reviewer',
                 'sensitive_check': 'check_sensitive',
                 'non_parameter_compliance_check': 'check_non_parameter_compliance',
                 'parameter_compliance_check': 'check_parameter_compliance',
-                'catalogue_completeness_check': 'check_outline_catalogue'  # 目录完整性检查
+                'catalogue_completeness_check': 'check_outline_catalogue'
             }
 
-            # 获取审查项配置
             review_item_config_raw = self.task_info.get_review_item_config_list()
-
-            # 将review_item_config中的值拆分成chapter_code和func_name 如{['basis':["sensitive_word_check","timeliness_basis_reviewer"]]}
             review_item_config = self.core_fun._replace_review_suffix(review_item_config_raw, review_func_mapping)
 
-            # 【新增】处理时效性审查的章节映射:
-            # - basis 章节使用 timeliness_basis_reviewer(编制依据时效性)
-            # - 其他章节使用 timeliness_content_reviewer(内容时效性)
-            processed_config = []
-            for item in review_item_config:
-                if '_' in item:
-                    chapter_code, func_name = item.split('_', 1)
-                    # 如果是时效性审查,根据章节选择正确的审查器
-                    if func_name == 'timeliness_basis_reviewer':
-                        if chapter_code == 'basis':
-                            # basis 章节保持使用 timeliness_basis_reviewer
-                            processed_config.append(item)
-                        else:
-                            # 其他章节使用 timeliness_content_reviewer
-                            processed_config.append(f"{chapter_code}_timeliness_content_reviewer")
-                        continue
-                processed_config.append(item)
-            review_item_config = processed_config
-
-            # 根据标准配置对review_item_config进行排序
+            # [已简化] 移除时效性审查的章节特殊处理,统一使用 timeliness_reviewer
             review_item_dict_sorted = self.core_fun._check_item_mapping_order(review_item_config)
-            logger.info(f"[DEBUG] 审查项配置解析完成: {review_item_dict_sorted}")
-            logger.info(f"[DEBUG] 是否包含 catalogue: {'catalogue' in review_item_dict_sorted}")
-            if 'catalogue' in review_item_dict_sorted:
-                logger.info(f"[DEBUG] catalogue 审查函数: {review_item_dict_sorted['catalogue']}")
-            
-            # 3️⃣ 获取结构化内容
-            original_chunks = state.get("structured_content", {}).get("chunks", [])
+            total_chapters = len(review_item_dict_sorted)
 
-            # 安全获取 outline - 兼容无目录的情况(如某些DOCX文档)
+            # ===== Phase 2: 内容获取与预处理 =====
+            # 获取原始内容
+            original_chunks = state.get("structured_content", {}).get("chunks", [])
             outline_data = state.get("structured_content", {}).get("outline", {})
-            if isinstance(outline_data, dict):
-                original_outline = outline_data.get("chapters", [])
-            else:
-                # outline 不是字典(可能是列表或None),视为无目录
-                original_outline = []
-                logger.warning("文档 outline 格式异常或非字典类型,视为无目录结构")
+            original_outline = outline_data.get("chapters", []) if isinstance(outline_data, dict) else []
 
-            # 如果没有目录,记录警告并继续(跳过目录审查)
             if not original_outline:
                 logger.warning("文档未检测到目录结构,将跳过目录完整性审查")
 
-            # 优先使用 catalog.formatted_text(标准格式)
+            # 构建目录文本
             catalog_data = state.get("structured_content", {}).get("catalog", {})
             if isinstance(catalog_data, dict) and catalog_data.get("formatted_text"):
                 outline_content_str = catalog_data["formatted_text"]
-                logger.info(f"使用 catalog.formatted_text,长度: {len(outline_content_str)}")
             else:
-                # 回退:从 outline 构建(修复格式)
-                outline_content_list = []
-
-                def extract_title_recursive(content, parent_title=""):
-                    """递归提取 title 字段,构建标准格式"""
-                    result = []
-                    if isinstance(content, dict):
-                        # 提取当前层级的 title
-                        title = content.get("title", "")
-                        if title:
-                            if parent_title:
-                                # 二级标题缩进
-                                result.append(f"  {title}")
-                            else:
-                                # 一级标题
-                                result.append(title)
-                        # 递归提取 subsections
-                        subsections = content.get("subsections", [])
-                        if isinstance(subsections, list):
-                            for subsection in subsections:
-                                result.extend(extract_title_recursive(subsection, title))
-                    return result
-
-                for content in original_outline:
-                    titles = extract_title_recursive(content)
-                    outline_content_list.extend(titles)
-
-                outline_content_str = "\n".join(outline_content_list)
-                logger.info(f"使用 outline 构建目录文本,长度: {len(outline_content_str)}")
-            
-            # 预处理:根据 review_item_dict_sorted 中的 key 对 structured_content 进行筛选
+                outline_content_str = self._build_outline_text(original_outline)
+
+            # 筛选与合并章节内容
             filtered_chunks = [
                 chunk for chunk in original_chunks
                 if chunk.get("chapter_classification") in review_item_dict_sorted.keys()
             ]
-
-            # filtered_outline = [
-            #     outline for outline in original_outline
-            #     if outline.get("chapter_classification") in review_item_dict_sorted.keys()
-            # ]
-
-            # 筛选完整性存在完整性审查的分类,将其整章进行合并
             filtered_chunks = self.core_fun._merge_chunks_for_completeness_check(
                 filtered_chunks, review_item_dict_sorted
             )
-
             cache.filtered_chunks(filtered_chunks, base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW)
-            # 更新 chunks 和 structured_content
-            # chunks = filtered_chunks
-            
-            # structured_content["chunks"] = chunks
 
-            total_chapters = len(review_item_dict_sorted)
-            cache.filtered_chunks(review_item_dict_sorted, base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW)
-            # 统计所有 filtered_chunks 作为总块数(与实际处理的块数保持一致)
-            all_check_items = []
-            for check_list in review_item_dict_sorted.values():
-                all_check_items.extend(check_list)  # 把每个分类的检查项加入总列表
-
-            # 统计所有 filtered_chunks 作为总块数(与实际处理的块数保持一致)
-            # 如果配置了 catalogue_completeness_check,+1 计入目录审查单元
+            # 计算总任务数
             has_catalogue = "catalogue" in review_item_dict_sorted
             total_chunks = len(filtered_chunks) + (1 if has_catalogue else 0)
-            if has_catalogue:
-                logger.info(f"检测到 catalogue_completeness_check 配置,目录审查计入总任务数")
 
-            # 初始化issues列表
-            all_issues = []
-            completed_chunks = 0
-            # 添加目录章节内容
-            catalogue = [{
-                    "chunk_id": "outline_1",
-                    "page": 1,
-                    "content": outline_content_str,
-                    "section_label": "目录",
-                    "project_plan_type": "catalogue",
-                    "chapter_classification": "catalogue",
-                    "element_tag": {
-                        "chunk_id": "outline_1",
-                        "page": 1,
-                        "serial_number": "1"
-                    },
-                    "chapter": "目录",
-                    "title": "目录",
-                    "original_content": outline_content_str,
-                    "is_complete_field": True
-                }
-            ]
+            # 分组并初始化
             chapter_chunks_map, chapter_names = self.core_fun._group_chunks_by_chapter(filtered_chunks)
+            cache.filtered_chunks(chapter_chunks_map, base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW)
+            await self.core_fun._send_start_review_progress(state, total_chunks, chapter_names)
 
+            # 构建目录章节数据
+            catalogue = [{
+                "chunk_id": "outline_1",
+                "page": 1,
+                "content": outline_content_str,
+                "section_label": "目录",
+                "project_plan_type": "catalogue",
+                "chapter_classification": "catalogue",
+                "element_tag": {"chunk_id": "outline_1", "page": 1, "serial_number": "1"},
+                "chapter": "目录",
+                "title": "目录",
+                "original_content": outline_content_str,
+                "is_complete_field": True
+            }]
+
+            # ===== Phase 3: 分章节执行审查 =====
+            all_issues = []
+            completed_chunks = 0
 
-            logger.info(f"内容分组完成,共 {len(chapter_chunks_map)} 个章节")
-            cache.filtered_chunks(chapter_chunks_map, base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW)
-            await self.core_fun._send_start_review_progress(state,total_chunks, chapter_names)
-            # 6️ 按章节处理
             for chapter_idx, (chapter_code, func_names) in enumerate(review_item_dict_sorted.items()):
-                logger.info(f" 处理章节 [{chapter_idx+1}/{total_chapters}]: {chapter_code},包含 {len(func_names)} 个审查任务")
-                # logger.info(f"🔍 章节处理前: all_issues数量={len(all_issues)}")  # 调试日志
+                logger.info(f"处理章节 [{chapter_idx+1}/{total_chapters}]: {chapter_code},包含 {len(func_names)} 个审查任务")
 
                 # 终止信号检查(章节级别)
                 if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
                     logger.warning(f"章节审查检测到终止信号,任务ID: {state['callback_task_id']}")
                     break
 
-                # 获取当前章节的内容
                 chapter_content = chapter_chunks_map.get(chapter_code, [])
                 if not chapter_content:
                     logger.warning(f"章节 {chapter_code} 没有找到对应内容,跳过")
@@ -469,48 +399,26 @@ class AIReviewWorkflow:
                 chunks_completed, all_issues = await self.core_fun._process_chapter_item(
                     chapter_code, chapter_content, func_names, state, all_issues, completed_chunks, total_chunks
                 )
-                # 更新已完成块数
                 completed_chunks += chunks_completed
-
                 logger.info(f"章节 {chapter_code} 处理完成")
 
-            # 主流程完成后,处理 catalogue(目录审查)
-            # 只有配置了 catalogue_completeness_check 才执行
-            logger.info(f"[DEBUG] 检查是否需要目录审查: catalogue in dict = {'catalogue' in review_item_dict_sorted}")
+            # ===== Phase 4: 目录审查(如配置) =====
             if "catalogue" in review_item_dict_sorted:
-                logger.info("[DEBUG] 开始处理目录审查(catalogue)")
-                logger.info(f"[DEBUG] catalogue chapter_content: {len(catalogue)} 个块")
-                logger.info(f"[DEBUG] catalogue 第一个块 keys: {catalogue[0].keys() if catalogue else 'N/A'}")
-                # 目录审查只执行目录结构检查(一二级缺失检查)
                 chunks_completed, all_issues = await self.core_fun._process_chapter_item(
-                    "catalogue",                 # chapter_code
-                    catalogue,                   # chapter_content
-                    ["check_outline_catalogue"],  # 只执行目录结构检查
-                    state,
-                    all_issues,
-                    completed_chunks,
-                    total_chunks
+                    "catalogue", catalogue, ["check_outline_catalogue"],
+                    state, all_issues, completed_chunks, total_chunks
                 )
-                completed_chunks += chunks_completed  # 更新已完成块数
-                logger.info("[DEBUG] 目录审查处理完成")
-                logger.info(f"[DEBUG] 目录审查完成后 all_issues 数量: {len(all_issues)}")
-            else:
-                logger.info("[DEBUG] 未配置 catalogue_completeness_check,跳过目录审查")
+                completed_chunks += chunks_completed
 
-            logger.info(f"all_issues_结果调试: {len(all_issues)} 个问题")
-            # 7️ 汇总结果
+            # ===== Phase 5: 结果汇总 =====
             summary = self.inter_tool._aggregate_results(all_issues)
 
-            # 8️ 构建完整的响应结构(兼容 execute() 方法的期望格式)
             review_results = {
-                # 兼容旧版格式的字段
                 "total_units": total_chunks,
                 "successful_units": completed_chunks,
                 "failed_units": max(0, total_chunks - completed_chunks),
-                "review_results": all_issues,  # 审查结果列表
+                "review_results": all_issues,
                 "summary": summary,
-
-                # 额外的元信息
                 "callback_task_id": state["callback_task_id"],
                 "file_name": state.get("file_name", ""),
                 "user_id": state["user_id"],
@@ -521,9 +429,10 @@ class AIReviewWorkflow:
                 "updated_at": int(time.time())
             }
 
-            logger.info(f"AI审查项检查执行成功,任务ID: {state['callback_task_id']}, 总单元数: {total_chunks}, 已完成: {completed_chunks}, 共发现{summary.get('total_issues', 0)}个问题")
+            logger.info(f"AI审查项检查执行成功,任务ID: {state['callback_task_id']}, "
+                       f"总单元数: {total_chunks}, 已完成: {completed_chunks}, "
+                       f"共发现{summary.get('total_issues', 0)}个问题")
 
-            # 返回新的状态
             return {
                 "current_stage": "ai_review_check_item_completed",
                 "review_results": review_results,
@@ -533,8 +442,6 @@ class AIReviewWorkflow:
 
         except Exception as e:
             logger.error(f"AI审查项检查执行失败,任务ID: {state['callback_task_id']}, 错误: {str(e)}", exc_info=True)
-
-            # 返回错误状态
             return {
                 "current_stage": "ai_review_check_item_failed",
                 "error_message": str(e),
@@ -542,6 +449,35 @@ class AIReviewWorkflow:
                 "messages": [AIMessage(content=f"AI审查项检查失败: {str(e)}")]
             }
 
+    def _build_outline_text(self, original_outline: List[Dict]) -> str:
+        """
+        从 outline 数据构建目录文本
+
+        Args:
+            original_outline: 原始 outline 数据列表
+
+        Returns:
+            str: 格式化后的目录文本
+        """
+        def _extract_title_recursive(content: Dict, parent_title: str = "") -> List[str]:
+            """递归提取 title 字段,构建标准格式"""
+            result = []
+            if isinstance(content, dict):
+                title = content.get("title", "")
+                if title:
+                    result.append(f"  {title}" if parent_title else title)
+                subsections = content.get("subsections", [])
+                if isinstance(subsections, list):
+                    for subsection in subsections:
+                        result.extend(_extract_title_recursive(subsection, title))
+            return result
+
+        outline_content_list = []
+        for content in original_outline:
+            outline_content_list.extend(_extract_title_recursive(content))
+
+        return "\n".join(outline_content_list)
+
     async def _save_results_node(self, state: AIReviewState) -> AIReviewState:
         """
         保存结果节点 - 将审查结果存储到本地文件或数据库
@@ -739,7 +675,11 @@ class AIReviewWorkflow:
         return "success"
 
     def _get_workflow_graph(self):
-        """获取工作流图(可视化输出)"""
+        """
+        获取并打印工作流图(可视化输出)
+
+        将工作流图以ASCII格式输出到控制台,并保存为PNG文件
+        """
         try:
             grandalf_graph = self.graph.get_graph()
             grandalf_graph.print_ascii()
@@ -749,7 +689,15 @@ class AIReviewWorkflow:
             logger.warning(f"生成工作流图失败: {str(e)}")
 
     def _save_workflow_graph(self, graph):
-        """保存工作流图到temp文件夹"""
+        """
+        保存工作流图到temp文件夹
+
+        Args:
+            graph: LangGraph 图对象
+
+        Note:
+            优先保存为PNG格式,失败则保存为ASCII文本格式
+        """
         try:
             # 创建temp目录
             temp_dir = "temp"
@@ -766,7 +714,6 @@ class AIReviewWorkflow:
 
         except Exception as e:
             try:
-                
                 ascii_file = os.path.join("temp", f"ai_review_workflow_graph.txt")
                 with open(ascii_file, 'w', encoding='utf-8') as f:
                     graph.print_ascii(file=f)
@@ -821,7 +768,12 @@ class AIReviewWorkflow:
             return ""
 
     async def _get_status(self) -> dict:
-        """获取工作流状态"""
+        """
+        获取当前工作流状态
+
+        Returns:
+            dict: 工作流进度状态,如果未配置进度管理器则返回空字典
+        """
         if self.progress_manager:
             return await self.progress_manager.get_progress(self.callback_task_id)
         return {}