|
|
@@ -17,26 +17,21 @@
|
|
|
├── execute() # 执行AI审查工作流
|
|
|
├── _start_node() # 开始节点
|
|
|
├── _initialize_progress_node() # 初始化进度节点
|
|
|
-├── _ai_review_node() # AI审查核心节点
|
|
|
+├── _ai_review_node() # AI审查核心节点(基于review_config)
|
|
|
+├── _ai_review_node_check_item() # AI审查项检查节点(基于review_item_config)
|
|
|
├── _save_results_node() # 保存结果节点(入库/本地文件)
|
|
|
├── _complete_node() # 完成节点
|
|
|
-└── _error_handler_node() # 错误处理节点
|
|
|
+├── _error_handler_node() # 错误处理节点
|
|
|
+└── _terminate_node() # 终止节点
|
|
|
|
|
|
-🔍 审查处理方法:
|
|
|
-├── _filter_review_units() # 筛选审查单元
|
|
|
-├── _calculate_overall_risk() # 计算总体风险等级
|
|
|
-├── _aggregate_results() # 汇总审查结果
|
|
|
-├── _format_review_results_to_issues() # 格式化审查结果为问题列表
|
|
|
-└── parse_ai_review_response() # 解析AI审查响应
|
|
|
-
|
|
|
-🛠️ 工具辅助方法:
|
|
|
-├── _check_ai_review_result() # 检查AI审查结果
|
|
|
+🛠️ 工作流辅助方法:
|
|
|
+├── _should_check_item_or_dimensions() # 检查应该使用哪种审查配置
|
|
|
+├── _should_terminate_or_error() # 检查是否应该终止或发生错误
|
|
|
├── _get_workflow_graph() # 获取工作流图(可视化)
|
|
|
-└── _get_status() # 获取工作流状态
|
|
|
+├── _save_workflow_graph() # 保存工作流图到temp文件夹
|
|
|
+├── _get_status() # 获取工作流状态
|
|
|
+└── _dummy_review_task() # 空任务(方法不存在时使用)
|
|
|
|
|
|
-⚙️ 配置管理:
|
|
|
-├── __init__() # 初始化工作流(支持审查模式配置)
|
|
|
-└── set_review_location_label() # 设置审查位置标签
|
|
|
'''
|
|
|
|
|
|
import asyncio
|
|
|
@@ -46,6 +41,7 @@ import re
|
|
|
from sre_parse import JUMP
|
|
|
import time
|
|
|
import os
|
|
|
+from typing import Dict, Tuple, Union, List
|
|
|
from dataclasses import dataclass, asdict
|
|
|
from typing import Optional, Callable, Dict, Any, TypedDict, Annotated, List
|
|
|
from langgraph.graph import StateGraph, END
|
|
|
@@ -55,15 +51,10 @@ from foundation.observability.logger.loggering import server_logger as logger
|
|
|
from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
|
|
|
from ..component import AIReviewEngine
|
|
|
from ..component.reviewers.utils.inter_tool import InterTool
|
|
|
-from core.base.task_models import TaskFileInfo
|
|
|
+from core.base.task_models import TaskFileInfo
|
|
|
+from .core_functions import AIReviewCoreFun
|
|
|
+from .types import AIReviewState
|
|
|
|
|
|
-# 常量定义
|
|
|
-DEFAULT_SLICE_START_INDEX = 30
|
|
|
-MAX_PROGRESS_PERCENTAGE = 100
|
|
|
-RISK_LEVELS = {"high": "高风险", "medium": "中风险", "low": "低风险"}
|
|
|
-DEFAULT_RISK_LEVEL = "medium"
|
|
|
-REVIEW_TIMEOUT = 120 # 单个审查任务超时时间(秒),包括基础审查和技术审查
|
|
|
-WORKFLOW_TIMEOUT = 3600 # 整个工作流超时时间(秒,30分钟)
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
@@ -76,23 +67,6 @@ class ReviewResult:
|
|
|
rag_enhanced: Dict[str, Any]
|
|
|
overall_risk: str
|
|
|
|
|
|
-class AIReviewState(TypedDict):
|
|
|
- """AI审查工作流状态"""
|
|
|
-
|
|
|
- file_id: str
|
|
|
- callback_task_id: str
|
|
|
- file_name: str
|
|
|
- user_id: str
|
|
|
- structured_content: Dict[str, Any]
|
|
|
- review_results: Optional[Dict[str, Any]]
|
|
|
- current_stage: str
|
|
|
- status: str
|
|
|
- error_message: Optional[str]
|
|
|
- progress_manager: Optional[Any]
|
|
|
- # 消息日志(用于LangGraph状态追踪)
|
|
|
- messages: Annotated[List[BaseMessage], add_messages]
|
|
|
-
|
|
|
-
|
|
|
|
|
|
class AIReviewWorkflow:
|
|
|
"""基于LangGraph的AI审查工作流"""
|
|
|
@@ -109,10 +83,12 @@ class AIReviewWorkflow:
|
|
|
max_review_units: 最大审查单元数量(None表示审查所有)
|
|
|
review_mode: 审查模式 ("all"=全部, "first"=前N个, "random"=随机N个)
|
|
|
"""
|
|
|
+ # 工作流超时时间定义
|
|
|
+ self.WORKFLOW_TIMEOUT = 3600
|
|
|
+
|
|
|
# 任务文件信息
|
|
|
self.task_info = task_file_info
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
self.file_id = task_file_info.file_id
|
|
|
self.callback_task_id = task_file_info.callback_task_id
|
|
|
self.user_id = task_file_info.user_id
|
|
|
@@ -130,6 +106,7 @@ class AIReviewWorkflow:
|
|
|
self.inter_tool = InterTool()
|
|
|
|
|
|
self.max_review_units = max_review_units
|
|
|
+ self.max_concurrent = 20 # 规范性与时效性审查最大并发数
|
|
|
self.review_mode = review_mode
|
|
|
|
|
|
# 延迟导入 WorkflowManager(避免循环导入)
|
|
|
@@ -155,6 +132,7 @@ class AIReviewWorkflow:
|
|
|
workflow.add_node("start", self._start_node)
|
|
|
workflow.add_node("initialize_progress", self._initialize_progress_node)
|
|
|
workflow.add_node("ai_review", self._ai_review_node)
|
|
|
+ workflow.add_node("ai_review_check_item",self._ai_review_node_check_item)
|
|
|
workflow.add_node("save_results", self._save_results_node) # 添加保存结果节点
|
|
|
workflow.add_node("complete", self._complete_node)
|
|
|
workflow.add_node("error_handler", self._error_handler_node)
|
|
|
@@ -162,7 +140,15 @@ class AIReviewWorkflow:
|
|
|
|
|
|
workflow.set_entry_point("start")
|
|
|
workflow.add_edge("start", "initialize_progress")
|
|
|
- workflow.add_edge("initialize_progress", "ai_review")
|
|
|
+ # 添加条件边:根据配置类型选择不同的审查节点
|
|
|
+ workflow.add_conditional_edges(
|
|
|
+ "initialize_progress",
|
|
|
+ self._should_check_item_or_dimensions,
|
|
|
+ {
|
|
|
+ "activate_ai_review_check_item": "ai_review_check_item", # 使用 review_item_config
|
|
|
+ "activate_ai_review": "ai_review" # 使用 review_config
|
|
|
+ }
|
|
|
+ )
|
|
|
|
|
|
# 添加条件边(错误处理 + 终止检查)
|
|
|
workflow.add_conditional_edges(
|
|
|
@@ -175,6 +161,17 @@ class AIReviewWorkflow:
|
|
|
}
|
|
|
)
|
|
|
|
|
|
+ # 添加条件边(错误处理 + 终止检查)
|
|
|
+ workflow.add_conditional_edges(
|
|
|
+ "ai_review_check_item",
|
|
|
+ self._should_terminate_or_error,
|
|
|
+ {
|
|
|
+ "terminate": "terminate", # 终止路径
|
|
|
+ "success": "save_results", # 成功后先保存结果
|
|
|
+ "error": "error_handler" # 错误处理
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
# 添加保存结果到完成的边
|
|
|
workflow.add_edge("save_results", "complete")
|
|
|
workflow.add_edge("complete", END)
|
|
|
@@ -206,7 +203,7 @@ class AIReviewWorkflow:
|
|
|
# 执行LangGraph工作流,添加超时控制
|
|
|
result = await asyncio.wait_for(
|
|
|
self.graph.ainvoke(initial_state),
|
|
|
- timeout=WORKFLOW_TIMEOUT
|
|
|
+ timeout=self.WORKFLOW_TIMEOUT
|
|
|
)
|
|
|
|
|
|
logger.info(f"LangGraph AI审查工作流完成,文件ID: {self.file_id}")
|
|
|
@@ -227,7 +224,7 @@ class AIReviewWorkflow:
|
|
|
return review_results
|
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
- logger.error(f"AI审查工作流超时({WORKFLOW_TIMEOUT}秒),文件ID: {self.file_id}")
|
|
|
+ logger.error(f"AI审查工作流超时({self.WORKFLOW_TIMEOUT}秒),文件ID: {self.file_id}")
|
|
|
raise TimeoutError(f"AI审查工作流执行超时,请检查文件大小或网络连接")
|
|
|
except Exception as e:
|
|
|
logger.error(f"LangGraph AI审查工作流执行失败: {str(e)}")
|
|
|
@@ -268,6 +265,163 @@ class AIReviewWorkflow:
|
|
|
"messages": [AIMessage(content="进度初始化完成")]
|
|
|
}
|
|
|
|
|
|
+ async def _ai_review_node_check_item(self, state: AIReviewState) -> AIReviewState:
|
|
|
+ """
|
|
|
+ 检查当前项是否需要AI审查 - 新版本
|
|
|
+
|
|
|
+ 执行流程:
|
|
|
+ 1. 终止信号检查
|
|
|
+ 2. 解析审查项配置
|
|
|
+ 3. 优先处理大纲审查
|
|
|
+ 4. 按章节处理(basis章节 vs 普通章节)
|
|
|
+ 5. 汇总结果并构建响应
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ logger.info(f"AI审查项检查开始执行,任务ID: {self.task_info.callback_task_id}")
|
|
|
+
|
|
|
+ # 1️ 终止信号检查
|
|
|
+ if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
|
|
|
+ logger.warning(f"AI审查项检查检测到终止信号,任务ID: {state['callback_task_id']}")
|
|
|
+ return {
|
|
|
+ "status": "terminated",
|
|
|
+ "current_stage": "ai_review_check_item",
|
|
|
+ "messages": [AIMessage(content="检测到终止信号")]
|
|
|
+ }
|
|
|
+
|
|
|
+ # 2️ 解析审查项配置
|
|
|
+ review_func_mapping: Dict[str, Union[str, List[str]]] = {
|
|
|
+ 'sensitive_word_check': 'sensitive_word_check',
|
|
|
+ 'semantic_logic_check': 'check_semantic_logic',
|
|
|
+ 'completeness_check': ['check_completeness', 'outline_check'],
|
|
|
+ 'timeliness_check': 'timeliness_basis_reviewer',
|
|
|
+ 'reference_check': 'reference_basis_reviewer',
|
|
|
+ 'sensitive_check': 'check_sensitive',
|
|
|
+ 'non_parameter_compliance_check': 'check_non_parameter_compliance',
|
|
|
+ 'parameter_compliance_check': 'check_parameter_compliance'
|
|
|
+ }
|
|
|
+
|
|
|
+ review_item_config_raw = self.task_info.get_review_item_config_list()
|
|
|
+ review_item_config = self.core_fun._replace_review_suffix(review_item_config_raw, review_func_mapping)
|
|
|
+
|
|
|
+ review_item_dict = {}
|
|
|
+ for item in review_item_config:
|
|
|
+ key, value = item.split("_", 1)
|
|
|
+ review_item_dict.setdefault(key, []).append(value)
|
|
|
+
|
|
|
+ logger.info(f"审查项配置解析完成: {review_item_dict}")
|
|
|
+
|
|
|
+ # 3️ 获取结构化内容
|
|
|
+ structured_content = state.get("structured_content", {})
|
|
|
+ chunks = structured_content.get("chunks", [])
|
|
|
+ total_chapters = len(review_item_dict)
|
|
|
+ total_chunks = len(chunks)
|
|
|
+
|
|
|
+ logger.info(f"准备执行动态审查任务,总章节数: {total_chapters}, 总块数: {total_chunks}")
|
|
|
+
|
|
|
+ # 发送开始审查进度
|
|
|
+ await self.core_fun._send_start_review_progress(state, total_chunks, 'check_item_review')
|
|
|
+
|
|
|
+ # 初始化issues列表
|
|
|
+ all_issues = []
|
|
|
+ completed_chunks = 0
|
|
|
+
|
|
|
+ # 4️ 优先处理大纲审查(在章节循环之前)
|
|
|
+ outline_data = structured_content.get("outline", {})
|
|
|
+ has_outline_check = any(
|
|
|
+ "outline_check" in funcs
|
|
|
+ for funcs in review_item_dict.values()
|
|
|
+ )
|
|
|
+
|
|
|
+ if has_outline_check and outline_data:
|
|
|
+ logger.info(" 开始执行大纲审查")
|
|
|
+ try:
|
|
|
+ outline_result = await self.ai_review_engine.outline_check(
|
|
|
+ state["callback_task_id"],
|
|
|
+ {"outline": outline_data},
|
|
|
+ state,
|
|
|
+ "大纲审查"
|
|
|
+ )
|
|
|
+ if outline_result:
|
|
|
+ all_issues.append(outline_result)
|
|
|
+ logger.info(f"大纲审查完成")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"大纲审查失败: {str(e)}", exc_info=True)
|
|
|
+
|
|
|
+ # 5️ 按章节分组
|
|
|
+ chapter_chunks_map = self.core_fun._group_chunks_by_chapter(chunks)
|
|
|
+ logger.info(f"内容分组完成,共 {len(chapter_chunks_map)} 个章节")
|
|
|
+
|
|
|
+ # 6️ 按章节处理
|
|
|
+ for chapter_idx, (chapter_code, func_names) in enumerate(review_item_dict.items()):
|
|
|
+ logger.info(f" 处理章节 [{chapter_idx+1}/{total_chapters}]: {chapter_code},包含 {len(func_names)} 个审查任务")
|
|
|
+
|
|
|
+ # 终止信号检查(章节级别)
|
|
|
+ if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
|
|
|
+ logger.warning(f"章节审查检测到终止信号,任务ID: {state['callback_task_id']}")
|
|
|
+ break
|
|
|
+
|
|
|
+ # 获取当前章节的内容
|
|
|
+ chapter_content = chapter_chunks_map.get(chapter_code, [])
|
|
|
+ if not chapter_content:
|
|
|
+ logger.warning(f"章节 {chapter_code} 没有找到对应内容,跳过")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 判断章节类型并分支处理
|
|
|
+ if chapter_code == "basis":
|
|
|
+ # === 编制依据章节:拼接所有chunk后一次性审查 ===
|
|
|
+ await self.core_fun._process_basis_chapter(
|
|
|
+ chapter_code, chapter_content, func_names, state, all_issues, completed_chunks, total_chunks
|
|
|
+ )
|
|
|
+ # 更新已完成块数
|
|
|
+ completed_chunks += len(chapter_content)
|
|
|
+ else:
|
|
|
+ # === 普通章节:逐块审查 ===
|
|
|
+ chunks_completed = await self.core_fun._process_normal_chapter(
|
|
|
+ chapter_code, chapter_content, func_names, state, all_issues
|
|
|
+ )
|
|
|
+ # 更新已完成块数
|
|
|
+ completed_chunks += chunks_completed
|
|
|
+
|
|
|
+ logger.info(f"章节 {chapter_code} 处理完成")
|
|
|
+
|
|
|
+ # 7️ 汇总结果
|
|
|
+ summary = self.inter_tool._aggregate_results(all_issues)
|
|
|
+
|
|
|
+ # 8️ 构建完整的响应结构
|
|
|
+ review_results = {
|
|
|
+ "callback_task_id": state["callback_task_id"],
|
|
|
+ "file_name": state.get("file_name", ""),
|
|
|
+ "user_id": state["user_id"],
|
|
|
+ "current": 100,
|
|
|
+ "stage_name": "审查项检查结果",
|
|
|
+ "status": "full_review_result",
|
|
|
+ "message": f"审查项检查完成,共发现{summary.get('total_issues', 0)}个问题",
|
|
|
+ "overall_task_status": "completed",
|
|
|
+ "updated_at": int(time.time()),
|
|
|
+ "issues": all_issues
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info(f"AI审查项检查执行成功,任务ID: {state['callback_task_id']}, 共发现{summary.get('total_issues', 0)}个问题")
|
|
|
+
|
|
|
+ # 返回新的状态
|
|
|
+ return {
|
|
|
+ "current_stage": "ai_review_check_item_completed",
|
|
|
+ "review_results": review_results,
|
|
|
+ "status": "completed",
|
|
|
+ "messages": [AIMessage(content=f"AI审查项检查完成,共发现{summary.get('total_issues', 0)}个问题")]
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"AI审查项检查执行失败,任务ID: {state['callback_task_id']}, 错误: {str(e)}", exc_info=True)
|
|
|
+
|
|
|
+ # 返回错误状态
|
|
|
+ return {
|
|
|
+ "current_stage": "ai_review_check_item_failed",
|
|
|
+ "error_message": str(e),
|
|
|
+ "status": "failed",
|
|
|
+ "messages": [AIMessage(content=f"AI审查项检查失败: {str(e)}")]
|
|
|
+ }
|
|
|
+
|
|
|
async def _ai_review_node(self, state: AIReviewState) -> AIReviewState:
|
|
|
"""
|
|
|
AI审查节点
|
|
|
@@ -291,7 +445,7 @@ class AIReviewWorkflow:
|
|
|
}
|
|
|
|
|
|
test_designation_chunk_flag = self.task_info.get_test_designation_chunk_flag()
|
|
|
- logger.info(f"测试定位标志: {test_designation_chunk_flag}")
|
|
|
+ logger.debug(f"测试定位标志: {test_designation_chunk_flag}")
|
|
|
|
|
|
# 1. 准备审查单元数据
|
|
|
review_chunks, total_units = await self.core_fun._prepare_review_units(state, test_designation_chunk_flag)
|
|
|
@@ -336,6 +490,8 @@ class AIReviewWorkflow:
|
|
|
"messages": [AIMessage(content="检测到终止信号")]
|
|
|
}
|
|
|
|
|
|
+ # 开始条文完整性审查
|
|
|
+
|
|
|
|
|
|
# 开始大纲审查
|
|
|
await self.core_fun._send_start_review_progress(state, total_units,'outline')
|
|
|
@@ -368,7 +524,7 @@ class AIReviewWorkflow:
|
|
|
# 准备编制依据审查数据
|
|
|
prep_basis_review_data = {
|
|
|
'content': prep_basis_content,
|
|
|
- 'max_concurrent': 20
|
|
|
+ 'max_concurrent': self.max_concurrent
|
|
|
}
|
|
|
|
|
|
# 执行编制依据审查
|
|
|
@@ -398,7 +554,7 @@ class AIReviewWorkflow:
|
|
|
# 准备编制依据审查数据
|
|
|
timeliness_check_data = {
|
|
|
'content': prep_basis_content,
|
|
|
- 'max_concurrent': 20
|
|
|
+ 'max_concurrent': self.max_concurrent
|
|
|
}
|
|
|
|
|
|
# 执行编制依据审查
|
|
|
@@ -633,6 +789,37 @@ class AIReviewWorkflow:
|
|
|
"messages": [AIMessage(content="任务已被终止")]
|
|
|
}
|
|
|
|
|
|
+ def _should_check_item_or_dimensions(self, state: AIReviewState) -> str:
|
|
|
+ """
|
|
|
+ 检查应该使用 review_item_config 还是 review_config
|
|
|
+
|
|
|
+ Args:
|
|
|
+ state: AI审查工作流状态
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ str: "success" 使用 review_item_config(章节_审查维度格式)
|
|
|
+ "error" 使用 review_config(审查维度格式)
|
|
|
+
|
|
|
+ Note:
|
|
|
+ 基于互斥验证逻辑:
|
|
|
+ 1. review_config 和 review_item_config 互斥,只有一个有值
|
|
|
+ 2. 如果 review_item_config 不为 None,走 ai_review_check_item 节点
|
|
|
+ 3. 如果 review_config 不为 None,走 ai_review 节点
|
|
|
+ """
|
|
|
+ # 获取 review_item_config 列表(使用 get_review_item_config_list 方法)
|
|
|
+ review_item_config = self.task_info.get_review_item_config_list()
|
|
|
+
|
|
|
+ # 判断是否应该使用 review_item_config
|
|
|
+ if len(review_item_config) > 0:
|
|
|
+ # 使用 review_item_config(章节_审查维度格式)
|
|
|
+ logger.info(f"使用 review_item_config 进行审查: {review_item_config}")
|
|
|
+ return "activate_ai_review_check_item"
|
|
|
+ else:
|
|
|
+ # 使用 review_config(审查维度格式)
|
|
|
+ review_config = self.task_info.get_review_config_list()
|
|
|
+ logger.info(f"使用 review_config 进行审查: {review_config}")
|
|
|
+ return "activate_ai_review"
|
|
|
+
|
|
|
def _should_terminate_or_error(self, state: AIReviewState) -> str:
|
|
|
"""
|
|
|
检查是否应该终止或发生错误
|
|
|
@@ -748,524 +935,3 @@ class AIReviewWorkflow:
|
|
|
return {}
|
|
|
|
|
|
|
|
|
-class AIReviewCoreFun:
|
|
|
- """AI审查核心功能类 - 负责具体的审查逻辑和数据处理"""
|
|
|
-
|
|
|
- def __init__(self, task_file_info: TaskFileInfo, ai_review_engine, max_review_units: int = None, review_mode: str = "all"):
|
|
|
- """
|
|
|
- 初始化AI审查核心功能类
|
|
|
-
|
|
|
- Args:
|
|
|
- task_file_info: TaskFileInfo 实例,包含任务相关信息
|
|
|
- ai_review_engine: AI审查引擎实例
|
|
|
- max_review_units: 最大审查单元数量(None表示审查所有)
|
|
|
- review_mode: 审查模式 ("all"=全部, "first"=前N个, "random"=随机N个)
|
|
|
- """
|
|
|
- self.task_info = task_file_info # ✅ 保存 TaskFileInfo 实例
|
|
|
- self.ai_review_engine = ai_review_engine
|
|
|
- self.max_review_units = max_review_units
|
|
|
- self.review_mode = review_mode
|
|
|
- self.message_lock = asyncio.Lock()
|
|
|
- self.inter_tool = InterTool()
|
|
|
-
|
|
|
- # ✅ 便捷访问属性
|
|
|
- self.file_id = task_file_info.file_id
|
|
|
- self.callback_task_id = task_file_info.callback_task_id
|
|
|
- self.user_id = task_file_info.user_id
|
|
|
- self.review_config = task_file_info.review_config
|
|
|
- self.project_plan_type = task_file_info.project_plan_type
|
|
|
-
|
|
|
-
|
|
|
- async def _execute_concurrent_reviews(self, review_chunks: List[Dict[str, Any]],
|
|
|
- total_units: int, state: AIReviewState,
|
|
|
- check_terminate: bool = False) -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 执行并发审查
|
|
|
-
|
|
|
- Args:
|
|
|
- review_chunks: 审查单元列表
|
|
|
- total_units: 总单元数
|
|
|
- state: AI审查状态
|
|
|
- check_terminate: 是否检查终止信号(默认False)
|
|
|
-
|
|
|
- Returns:
|
|
|
- List[Dict[str, Any]]: 审查结果列表(issues格式)
|
|
|
- """
|
|
|
-
|
|
|
- try:
|
|
|
- # 获取 workflow_manager 实例(延迟导入避免循环依赖)
|
|
|
- from core.base.workflow_manager import WorkflowManager
|
|
|
- workflow_manager = WorkflowManager()
|
|
|
-
|
|
|
- semaphore = asyncio.Semaphore(3) # 并发审查数
|
|
|
-
|
|
|
- async def process_unit_and_notify(unit_index, unit_content):
|
|
|
- """处理单个单元,完成后立即推送通知"""
|
|
|
- async with semaphore:
|
|
|
- # ⚠️ 检查终止信号(每个单元审查前)
|
|
|
- if check_terminate:
|
|
|
- if await workflow_manager.check_terminate_signal(state["callback_task_id"]):
|
|
|
- logger.warning(f"并发审查检测到终止信号,停止后续审查: {state['callback_task_id']}")
|
|
|
- return None # 返回 None 表示被终止
|
|
|
-
|
|
|
- # 执行单个单元审查
|
|
|
- result = await self._review_single_unit(unit_content, unit_index, total_units, state)
|
|
|
-
|
|
|
- # 审查完成后立即推送通知
|
|
|
- if result.overall_risk != "error":
|
|
|
- section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
|
|
|
- chapter_code = unit_content.get('chapter_classification', '')
|
|
|
- logger.info(f"section_label: {section_label}")
|
|
|
- # 格式化issues以获取问题数量
|
|
|
- issues = self.inter_tool._format_review_results_to_issues(
|
|
|
- state["callback_task_id"],
|
|
|
- unit_index,
|
|
|
- f"第{unit_content.get('page', '')}页:{section_label}",
|
|
|
- chapter_code,
|
|
|
- unit_content,
|
|
|
- result.basic_compliance,
|
|
|
- result.technical_compliance
|
|
|
- )
|
|
|
-
|
|
|
- current = int(((unit_index + 1) / total_units) * 100)
|
|
|
-
|
|
|
- # 立即发送单元审查详情(包含unit_review和processing_flag事件)
|
|
|
- await self._send_unit_review_progress(state, unit_index, total_units, section_label, issues, current)
|
|
|
- return issues
|
|
|
- else:
|
|
|
- logger.error(f"执行单个单元审查失败: {str(result.error_message)}")
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
- # 创建并发任务
|
|
|
- tasks = [
|
|
|
- asyncio.create_task(process_unit_and_notify(i, unit_content))
|
|
|
- for i, unit_content in enumerate(review_chunks)
|
|
|
- ]
|
|
|
-
|
|
|
- # 等待所有任务完成
|
|
|
- all_results = await asyncio.gather(*tasks)
|
|
|
-
|
|
|
- # 过滤有效结果(issues格式)
|
|
|
- successful_results = [issues for issues in all_results if issues and isinstance(issues, list)]
|
|
|
- return successful_results
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"执行并发审查失败: {str(e)}")
|
|
|
- return []
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- async def _prepare_review_units(self, state: AIReviewState, test_designation_chunk_flag) -> tuple:
|
|
|
- """准备审查单元数据 (增加清理旧进度缓存)"""
|
|
|
- try:
|
|
|
- # 筛选要审查的单元
|
|
|
- all_chunks = state['structured_content']['chunks']
|
|
|
- # 筛除编制依据章节
|
|
|
- clearned_chunks = self._remove_basis_chunks(all_chunks)
|
|
|
-
|
|
|
- # 判断是否需要筛选指定测试章节
|
|
|
- if test_designation_chunk_flag is not None:
|
|
|
- # 用户指定了测试章节,进行筛选
|
|
|
- logger.info(f"开始筛选指定测试章节: {test_designation_chunk_flag}")
|
|
|
- designation_test_chunk = self._designation_test_chunks(clearned_chunks, test_designation_chunk_flag)
|
|
|
-
|
|
|
- if not designation_test_chunk:
|
|
|
- # 指定了测试章节但未找到,返回空列表
|
|
|
- logger.warning(f"未找到包含关键字 '{test_designation_chunk_flag}' 的测试章节,建议去除前后符号(如《》())使用简洁关键词")
|
|
|
- review_chunks = []
|
|
|
- else:
|
|
|
- # 找到指定测试章节
|
|
|
- logger.info(f"找到 {len(designation_test_chunk)} 个指定测试章节")
|
|
|
- review_chunks = designation_test_chunk
|
|
|
- else:
|
|
|
- # 未指定测试章节,使用正常筛选流程
|
|
|
- logger.info(f"未指定测试章节,使用正常筛选流程")
|
|
|
- review_chunks = self._filter_review_units(clearned_chunks)
|
|
|
-
|
|
|
- total_units = len(review_chunks)
|
|
|
- logger.info(f"最终审查单元数量: {total_units}")
|
|
|
-
|
|
|
- # 【修复 3】: 任务开始前,清理 Redis 中的旧计数,防止进度条计算错误
|
|
|
- try:
|
|
|
- task_id = state.get("callback_task_id", "")
|
|
|
- if task_id:
|
|
|
- redis_client = await RedisConnectionFactory.get_connection()
|
|
|
- completed_key = f"ai_review:overall_task_progress:{task_id}:completed"
|
|
|
- await redis_client.delete(completed_key)
|
|
|
- logger.info(f"已清理旧进度缓存: {completed_key}")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"清理进度缓存失败 (不影响主流程): {str(e)}")
|
|
|
- return review_chunks, total_units
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"准备审查单元失败: {str(e)}")
|
|
|
- raise
|
|
|
-
|
|
|
- def _remove_basis_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 筛除编制依据章节的chunks
|
|
|
-
|
|
|
- Args:
|
|
|
- chunks: 所有章节chunks列表
|
|
|
-
|
|
|
- Returns:
|
|
|
- List[Dict[str, Any]]: 筛除编制依据章节后的chunks列表
|
|
|
-
|
|
|
- Note:
|
|
|
- 根据 chapter_classification 字段筛选,排除值为 "basis" 的章节
|
|
|
- """
|
|
|
- try:
|
|
|
- filtered_chunks = []
|
|
|
- removed_count = 0
|
|
|
- logger.info(f"开始筛除编制依据章节")
|
|
|
- for chunk in chunks:
|
|
|
- # 检查章节分类字段
|
|
|
- chapter_classification = chunk.get('chapter_classification', '')
|
|
|
-
|
|
|
- # 保留非编制依据章节
|
|
|
- if chapter_classification != 'basis':
|
|
|
- logger.info(f"保留非编制依据章节,当前章节: {chapter_classification}")
|
|
|
- filtered_chunks.append(chunk)
|
|
|
- else:
|
|
|
- removed_count += 1
|
|
|
- logger.debug(f"筛除编制依据章节: {chunk.get('section_label', '未知章节')}")
|
|
|
-
|
|
|
- logger.info(f"编制依据章节筛除完成: 共筛除 {removed_count} 个章节, 保留 {len(filtered_chunks)} 个章节")
|
|
|
-
|
|
|
- return filtered_chunks
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"筛除编制依据章节失败: {str(e)}")
|
|
|
- # 出错时返回原始列表
|
|
|
- return chunks
|
|
|
- def _designation_test_chunks(self, chunks: List[Dict[str, Any]],test_designation_chunk_flag:str) -> List[Dict[str, Any]]:
|
|
|
- """筛选设计测试章节
|
|
|
-
|
|
|
- Args:
|
|
|
- chunks: 所有章节chunks列表
|
|
|
-
|
|
|
- Returns:
|
|
|
- List[Dict[str, Any]]: 筛选后的chunks列表
|
|
|
-
|
|
|
- Note:
|
|
|
- 根据 chapter_classification 字段筛选,排除值为 "designation_test" 的章节
|
|
|
-
|
|
|
- Raises:
|
|
|
- Exception: 筛选失败
|
|
|
-
|
|
|
- """
|
|
|
- try:
|
|
|
- designation_chunks = []
|
|
|
- filtered_count = 0
|
|
|
-
|
|
|
- logger.info(f"开始筛选设计测试章节")
|
|
|
- for chunk in chunks:
|
|
|
- content = chunk.get('content', '')
|
|
|
- section_label = chunk.get('section_label', '未知章节')
|
|
|
- logger.info(f"正在处理章节: {section_label}")
|
|
|
- if test_designation_chunk_flag in content or test_designation_chunk_flag in section_label:
|
|
|
- logger.info(f"已命中指定测试章节: {chunk.get('section_label', '未知章节')}")
|
|
|
- designation_chunks.append(chunk)
|
|
|
- else:
|
|
|
- filtered_count += 1
|
|
|
- logger.debug(f"跳过章节: {chunk.get('section_label', '未知章节')}")
|
|
|
- if not designation_chunks:
|
|
|
- logger.info(f"未找到指定测试章节,请修改关键字尝试!")
|
|
|
-
|
|
|
- return designation_chunks
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"筛选设计测试章节失败: {str(e)}")
|
|
|
- # 出错时返回原始列表
|
|
|
- return chunks
|
|
|
-
|
|
|
- async def _review_single_unit(self, unit_content: Dict[str, Any], unit_index: int,
|
|
|
- total_units: int, state: AIReviewState) -> ReviewResult:
|
|
|
- """
|
|
|
- 审查单个单元的核心业务逻辑
|
|
|
-
|
|
|
- Args:
|
|
|
- unit_content: 单元内容
|
|
|
- unit_index: 单元索引
|
|
|
- total_units: 总单元数
|
|
|
- state: AI审查状态
|
|
|
-
|
|
|
- Returns:
|
|
|
- ReviewResult: 单元审查结果
|
|
|
- """
|
|
|
- try:
|
|
|
-
|
|
|
- # 构建Trace ID
|
|
|
- trace_id_idx = f"({state['callback_task_id']}-{unit_index})"
|
|
|
-
|
|
|
- # 获取section_label用于stage_name
|
|
|
- section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
|
|
|
- page = unit_content.get('page', '')
|
|
|
- review_location_label = f"第{page}页:{section_label}"
|
|
|
- stage_name = f"AI审查:{section_label}"
|
|
|
- #logger.info(f"test_review_location_label:{trace_id_idx}: {review_location_label}")
|
|
|
- review_tasks = [
|
|
|
- asyncio.create_task(
|
|
|
- asyncio.wait_for(
|
|
|
- self.ai_review_engine.basic_compliance_check(trace_id_idx, unit_content, review_location_label,state,stage_name),
|
|
|
- timeout=REVIEW_TIMEOUT
|
|
|
- )
|
|
|
- ),
|
|
|
- asyncio.create_task(
|
|
|
- asyncio.wait_for(
|
|
|
- self.ai_review_engine.technical_compliance_check(trace_id_idx, unit_content,review_location_label,state,stage_name),
|
|
|
- timeout=REVIEW_TIMEOUT
|
|
|
- )
|
|
|
- ),
|
|
|
- ]
|
|
|
-
|
|
|
- # 使用 asyncio.gather 保证结果顺序,并提供超时控制
|
|
|
- # 整体超时 = 两个任务的超时之和 + 缓冲时间
|
|
|
- total_timeout = REVIEW_TIMEOUT * len(review_tasks) + 10
|
|
|
-
|
|
|
- try:
|
|
|
- # 使用 gather 确保结果顺序与任务顺序一致
|
|
|
- review_results = await asyncio.wait_for(
|
|
|
- asyncio.gather(*review_tasks, return_exceptions=True),
|
|
|
- timeout=total_timeout
|
|
|
- )
|
|
|
- except asyncio.TimeoutError:
|
|
|
- logger.error(f"[工作流] 审查任务整体超时: trace_id={trace_id_idx}")
|
|
|
- # 填充超时结果
|
|
|
- review_results = [
|
|
|
- Exception(f"基础审查超时({REVIEW_TIMEOUT}秒)"),
|
|
|
- Exception(f"技术审查超时({REVIEW_TIMEOUT}秒)")
|
|
|
- ]
|
|
|
-
|
|
|
- # 处理异常结果(gather 已经将异常作为结果返回)
|
|
|
- basic_result = review_results[0] if not isinstance(review_results[0], Exception) else {"error": str(review_results[0])}
|
|
|
- technical_result = review_results[1] if not isinstance(review_results[1], Exception) else {"error": str(review_results[1])}
|
|
|
-
|
|
|
- # RAG检查已注释,提供空结果
|
|
|
- rag_result = {"error": "RAG check disabled"}
|
|
|
-
|
|
|
- # 计算总体风险等级
|
|
|
- inter_tool = InterTool()
|
|
|
- overall_risk = inter_tool._calculate_overall_risk(basic_result, technical_result, rag_result)
|
|
|
-
|
|
|
-
|
|
|
- return ReviewResult(
|
|
|
- unit_index=unit_index,
|
|
|
- unit_content=unit_content,
|
|
|
- basic_compliance=basic_result,
|
|
|
- technical_compliance=technical_result,
|
|
|
- rag_enhanced=rag_result,
|
|
|
- overall_risk=overall_risk
|
|
|
- )
|
|
|
-
|
|
|
- except asyncio.TimeoutError:
|
|
|
- logger.error(f"审查单元 {unit_index} 超时")
|
|
|
- return ReviewResult(
|
|
|
- unit_index=unit_index,
|
|
|
- unit_content=unit_content,
|
|
|
- basic_compliance={"error": f"审查超时({REVIEW_TIMEOUT}秒)"},
|
|
|
- technical_compliance={"error": f"审查超时({REVIEW_TIMEOUT}秒)"},
|
|
|
- rag_enhanced={"error": f"审查超时({REVIEW_TIMEOUT}秒)"},
|
|
|
- overall_risk="error"
|
|
|
- )
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"审查单元 {unit_index} 失败: {str(e)}")
|
|
|
- return ReviewResult(
|
|
|
- unit_index=unit_index,
|
|
|
- unit_content=unit_content,
|
|
|
- basic_compliance={"error": str(e)},
|
|
|
- technical_compliance={"error": str(e)},
|
|
|
- rag_enhanced={"error": str(e)},
|
|
|
- overall_risk="error"
|
|
|
- )
|
|
|
-
|
|
|
- async def _send_start_review_progress(self, state: AIReviewState, total_units: int = None, review_type : str =None) -> None:
|
|
|
- """
|
|
|
- 发送开始审查的进度更新
|
|
|
-
|
|
|
- Args:
|
|
|
- state: AI审查状态
|
|
|
- total_units: 总审查单元数
|
|
|
- """
|
|
|
-
|
|
|
-
|
|
|
- try:
|
|
|
-
|
|
|
-
|
|
|
- if state["progress_manager"]:
|
|
|
- if review_type is "outline":
|
|
|
- await state["progress_manager"].update_stage_progress(
|
|
|
- callback_task_id=state["callback_task_id"],
|
|
|
- stage_name="AI审查",
|
|
|
- current=0,
|
|
|
- status="processing",
|
|
|
- message=f"开始大纲审查",
|
|
|
- event_type="processing"
|
|
|
- )
|
|
|
- # elif review_type is "prpe_basis":
|
|
|
- # await state["progress_manager"].update_stage_progress(
|
|
|
- # callback_task_id=state["callback_task_id"],
|
|
|
- # stage_name="AI审查",
|
|
|
- # current=0,
|
|
|
- # total=total_units,
|
|
|
- # status="processing",
|
|
|
- # message=f"开始编制依据审查",
|
|
|
- # event_type="processing"
|
|
|
- # )
|
|
|
- else:
|
|
|
- await state["progress_manager"].update_stage_progress(
|
|
|
- callback_task_id=state["callback_task_id"],
|
|
|
- stage_name="AI审查",
|
|
|
- current=0,
|
|
|
- status="processing",
|
|
|
- message=f"开始核心审查,共 {total_units} 个审查单元",
|
|
|
- event_type="processing"
|
|
|
- )
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"发送开始进度更新失败: {str(e)}")
|
|
|
-
|
|
|
- async def _send_unit_review_progress(self, state: AIReviewState, unit_index: int,
|
|
|
- total_units: int, section_label: str,
|
|
|
- issues: List[Dict], current: int) -> None:
|
|
|
- """
|
|
|
- 发送单元审查详细信息 - 强制串行并统一进度值
|
|
|
- """
|
|
|
- async with self.message_lock:
|
|
|
- try:
|
|
|
- # 1. 计算问题数量
|
|
|
- issues_count = 0
|
|
|
- if isinstance(issues, list) and issues:
|
|
|
- issues_count = sum(
|
|
|
- 1 for issue in issues
|
|
|
- for issue_data in issue.values()
|
|
|
- for review_item in issue_data.get("review_lists", [])
|
|
|
- if review_item.get("exist_issue", False)
|
|
|
- )
|
|
|
-
|
|
|
- real_current = await self._send_unit_overall_progress(
|
|
|
- state, unit_index, total_units, section_label, issues_count
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
- final_current = real_current if real_current is not None else current
|
|
|
-
|
|
|
- await asyncio.sleep(0.05)
|
|
|
-
|
|
|
- # 3. 发送单元详情 (Unit Review)
|
|
|
- if isinstance(issues, list) and issues and state["progress_manager"]:
|
|
|
- stage_name = f"AI审查:{section_label}"
|
|
|
-
|
|
|
- await state["progress_manager"].update_stage_progress(
|
|
|
- callback_task_id=state["callback_task_id"],
|
|
|
- stage_name=stage_name,
|
|
|
- current=final_current, # 【关键】使用与 Flag 事件完全一致的进度值
|
|
|
- status="unit_review_update",
|
|
|
- message=f"发现{issues_count}个问题: {section_label}",
|
|
|
- issues=issues,
|
|
|
- user_id=state.get("user_id", ""),
|
|
|
- overall_task_status="processing",
|
|
|
- event_type="unit_review"
|
|
|
- )
|
|
|
-
|
|
|
- # 再次微小延迟,确保 Clear 不会吞掉 Review
|
|
|
- await asyncio.sleep(0.02)
|
|
|
-
|
|
|
- # 清空当前issues
|
|
|
- await state["progress_manager"].update_stage_progress(
|
|
|
- callback_task_id=state["callback_task_id"],
|
|
|
- issues=['clear']
|
|
|
- )
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"发送单元审查详情失败: {str(e)}")
|
|
|
-
|
|
|
- async def _send_unit_overall_progress(self, state: AIReviewState, unit_index: int,
|
|
|
- total_units: int, section_label: str,
|
|
|
- issues_count: int = None) -> Optional[int]:
|
|
|
- """
|
|
|
- 发送单元完成进度更新 - 返回计算出的实时进度
|
|
|
- Returns:
|
|
|
- int: 基于 Redis 统计的实时进度百分比
|
|
|
- """
|
|
|
- current_percent = None
|
|
|
- try:
|
|
|
- task_id = state.get("callback_task_id", "")
|
|
|
- redis_client = None
|
|
|
- try:
|
|
|
- redis_client = await RedisConnectionFactory.get_connection()
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"Redis连接失败: {str(e)}")
|
|
|
-
|
|
|
- completed_count = 0
|
|
|
-
|
|
|
- if redis_client and task_id:
|
|
|
- completed_key = f"ai_review:overall_task_progress:{task_id}:completed"
|
|
|
- # 原子操作
|
|
|
- await redis_client.sadd(completed_key, str(unit_index))
|
|
|
- await redis_client.expire(completed_key, 3600)
|
|
|
- completed_count = await redis_client.scard(completed_key)
|
|
|
-
|
|
|
- # 计算进度
|
|
|
- current_percent = int((completed_count / total_units) * 100)
|
|
|
- else:
|
|
|
- # 降级方案
|
|
|
- completed_count = unit_index + 1
|
|
|
- current_percent = int((completed_count / total_units) * 100)
|
|
|
-
|
|
|
- # 构建消息
|
|
|
- if issues_count is not None and issues_count > 0:
|
|
|
- message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
|
|
|
- else:
|
|
|
- message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}"
|
|
|
-
|
|
|
- logger.info(f"进度更新: {current_percent}% - {message}")
|
|
|
-
|
|
|
- if state["progress_manager"]:
|
|
|
- await state["progress_manager"].update_stage_progress(
|
|
|
- callback_task_id=state["callback_task_id"],
|
|
|
- stage_name="AI审查",
|
|
|
- current=current_percent,
|
|
|
- status="processing",
|
|
|
- message=message,
|
|
|
- user_id=state.get("user_id", ""),
|
|
|
- overall_task_status="processing",
|
|
|
- event_type="processing_flag"
|
|
|
- )
|
|
|
-
|
|
|
- return current_percent
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"发送单元完成进度更新失败: {str(e)}")
|
|
|
- # 发生异常时,尝试返回一个基于 index 的估算值
|
|
|
- try:
|
|
|
- return int(((unit_index + 1) / total_units) * 100)
|
|
|
- except:
|
|
|
- return 0
|
|
|
-
|
|
|
- def _filter_review_units(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 根据配置筛选要审查的单元
|
|
|
-
|
|
|
- Args:
|
|
|
- chunks: 所有审查单元
|
|
|
-
|
|
|
- Returns:
|
|
|
- List[Dict[str, Any]]: 筛选后的审查单元
|
|
|
-
|
|
|
- Note:
|
|
|
- 根据max_review_units和review_mode配置来筛选审查单元
|
|
|
- """
|
|
|
- if not self.max_review_units or self.max_review_units >= len(chunks):
|
|
|
- # 如果没有限制或限制数量大于等于总数,返回所有单元
|
|
|
- return chunks
|
|
|
-
|
|
|
- if self.review_mode == "first":
|
|
|
- # 返回前N个单元
|
|
|
- return chunks[:self.max_review_units]
|
|
|
- elif self.review_mode == "random":
|
|
|
- # 随机选择N个单元
|
|
|
- return random.sample(chunks, self.max_review_units)
|
|
|
- else:
|
|
|
- # 默认返回所有审查单元
|
|
|
- return chunks
|
|
|
-
|