|
@@ -9,15 +9,18 @@
|
|
|
|
|
|
|
|
import asyncio
|
|
import asyncio
|
|
|
import time
|
|
import time
|
|
|
-from typing import Dict, Optional
|
|
|
|
|
|
|
+from typing import Dict, Optional, Any
|
|
|
from datetime import datetime
|
|
from datetime import datetime
|
|
|
|
|
+from langgraph.graph import StateGraph, END
|
|
|
|
|
+from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
|
|
|
from foundation.observability.logger.loggering import server_logger as logger
|
|
from foundation.observability.logger.loggering import server_logger as logger
|
|
|
from foundation.observability.monitoring.time_statistics import track_execution_time
|
|
from foundation.observability.monitoring.time_statistics import track_execution_time
|
|
|
from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
|
|
from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
|
|
|
from .progress_manager import ProgressManager
|
|
from .progress_manager import ProgressManager
|
|
|
from .redis_duplicate_checker import RedisDuplicateChecker
|
|
from .redis_duplicate_checker import RedisDuplicateChecker
|
|
|
from .task_models import TaskFileInfo, TaskChain
|
|
from .task_models import TaskFileInfo, TaskChain
|
|
|
-from ..construction_review.workflows import DocumentWorkflow,AIReviewWorkflow
|
|
|
|
|
|
|
+from ..construction_review.workflows import DocumentWorkflow, AIReviewWorkflow, ReportWorkflow
|
|
|
|
|
+from ..construction_review.workflows.types import TaskChainState
|
|
|
|
|
|
|
|
class ProgressManagerRegistry:
|
|
class ProgressManagerRegistry:
|
|
|
"""ProgressManager注册表 - 为每个任务管理独立的ProgressManager实例"""
|
|
"""ProgressManager注册表 - 为每个任务管理独立的ProgressManager实例"""
|
|
@@ -64,6 +67,9 @@ class WorkflowManager:
|
|
|
self._terminate_signal_prefix = "ai_review:terminate_signal:"
|
|
self._terminate_signal_prefix = "ai_review:terminate_signal:"
|
|
|
self._task_expire_time = 7200 # 2小时
|
|
self._task_expire_time = 7200 # 2小时
|
|
|
|
|
|
|
|
|
|
+ # LangGraph 任务链工作流(方案D)
|
|
|
|
|
+ self.task_chain_graph = None # 延迟初始化,避免循环导入
|
|
|
|
|
+
|
|
|
async def submit_task_processing(self, file_info: dict) -> str:
|
|
async def submit_task_processing(self, file_info: dict) -> str:
|
|
|
"""异步提交任务处理(用于file_upload层)"""
|
|
"""异步提交任务处理(用于file_upload层)"""
|
|
|
from foundation.infrastructure.messaging.tasks import submit_task_processing_task
|
|
from foundation.infrastructure.messaging.tasks import submit_task_processing_task
|
|
@@ -86,10 +92,15 @@ class WorkflowManager:
|
|
|
raise
|
|
raise
|
|
|
@track_execution_time
|
|
@track_execution_time
|
|
|
def submit_task_processing_sync(self, file_info: dict) -> dict:
|
|
def submit_task_processing_sync(self, file_info: dict) -> dict:
|
|
|
- """同步提交任务处理(用于Celery worker)"""
|
|
|
|
|
- try:
|
|
|
|
|
|
|
+ """
|
|
|
|
|
+ 同步提交任务处理(用于Celery worker)
|
|
|
|
|
|
|
|
- logger.info(f"提交文档处理任务: {file_info['file_id']}")
|
|
|
|
|
|
|
+ Note:
|
|
|
|
|
+ 已切换到 LangGraph 任务链工作流(方案D)
|
|
|
|
|
+ 使用统一的状态管理和嵌套子图架构
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ logger.info(f"提交文档处理任务(LangGraph方案D): {file_info['file_id']}")
|
|
|
|
|
|
|
|
# 1. 创建TaskFileInfo对象(封装任务文件信息)
|
|
# 1. 创建TaskFileInfo对象(封装任务文件信息)
|
|
|
task_file_info = TaskFileInfo(file_info)
|
|
task_file_info = TaskFileInfo(file_info)
|
|
@@ -107,153 +118,95 @@ class WorkflowManager:
|
|
|
# 5. 添加到活跃任务跟踪
|
|
# 5. 添加到活跃任务跟踪
|
|
|
self.active_chains[callback_task_id] = task_chain
|
|
self.active_chains[callback_task_id] = task_chain
|
|
|
|
|
|
|
|
- # 5. 初始化进度管理
|
|
|
|
|
|
|
+ # 6. 初始化进度管理
|
|
|
asyncio.run(self.progress_manager.initialize_progress(
|
|
asyncio.run(self.progress_manager.initialize_progress(
|
|
|
callback_task_id=callback_task_id,
|
|
callback_task_id=callback_task_id,
|
|
|
user_id=task_file_info.user_id,
|
|
user_id=task_file_info.user_id,
|
|
|
stages=[]
|
|
stages=[]
|
|
|
))
|
|
))
|
|
|
|
|
|
|
|
- # 6. 启动处理流程(同步执行)
|
|
|
|
|
- self._process_task_chain_sync(task_chain, task_file_info, task_file_info.file_type)
|
|
|
|
|
-
|
|
|
|
|
- # logger.info(f"提交文档处理任务: {callback_task_id}")
|
|
|
|
|
- logger.info(f"施工方案审查任务已完成! ")
|
|
|
|
|
- logger.info(f"文件ID: {task_file_info.file_id}")
|
|
|
|
|
- logger.info(f"文件名:{task_file_info.file_name}")
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"提交文档处理任务失败: {str(e)}")
|
|
|
|
|
- raise
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- def _process_task_chain_sync(self, task_chain: TaskChain, task_file_info: TaskFileInfo, file_type: str):
|
|
|
|
|
- """同步处理文档任务链(用于Celery worker)"""
|
|
|
|
|
- try:
|
|
|
|
|
- file_content = task_file_info.file_content
|
|
|
|
|
-
|
|
|
|
|
- # 阶段1:文档处理(串行)
|
|
|
|
|
-
|
|
|
|
|
- document_workflow = DocumentWorkflow(
|
|
|
|
|
- task_file_info=task_file_info,
|
|
|
|
|
- progress_manager=self.progress_manager,
|
|
|
|
|
- redis_duplicate_checker=self.redis_duplicate_checker
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- # 同步执行文档处理
|
|
|
|
|
- loop = asyncio.new_event_loop()
|
|
|
|
|
- asyncio.set_event_loop(loop)
|
|
|
|
|
- doc_result = loop.run_until_complete(document_workflow.execute(file_content, file_type))
|
|
|
|
|
- loop.close()
|
|
|
|
|
-
|
|
|
|
|
- task_chain.results['document'] = doc_result
|
|
|
|
|
-
|
|
|
|
|
- # 阶段2:AI审查(内部并发)
|
|
|
|
|
- task_chain.update_stage("ai_review")
|
|
|
|
|
-
|
|
|
|
|
- structured_content = doc_result['structured_content']
|
|
|
|
|
-
|
|
|
|
|
- # 读取AI审查配置
|
|
|
|
|
- import configparser
|
|
|
|
|
- config = configparser.ConfigParser()
|
|
|
|
|
- config.read('config/config.ini', encoding='utf-8')
|
|
|
|
|
-
|
|
|
|
|
- max_review_units = config.getint('ai_review', 'MAX_REVIEW_UNITS', fallback=None)
|
|
|
|
|
- if max_review_units == 0: # 如果配置为0,表示审查所有
|
|
|
|
|
- max_review_units = None
|
|
|
|
|
- review_mode = config.get('ai_review', 'REVIEW_MODE', fallback='all')
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"AI审查配置: 最大审查数量={max_review_units}, 审查模式={review_mode}")
|
|
|
|
|
|
|
+ # 7. 构建 LangGraph 任务链工作流(延迟初始化)
|
|
|
|
|
+ if self.task_chain_graph is None:
|
|
|
|
|
+ self.task_chain_graph = self._build_task_chain_workflow()
|
|
|
|
|
|
|
|
- ai_workflow = AIReviewWorkflow(
|
|
|
|
|
- task_file_info=task_file_info,
|
|
|
|
|
- structured_content=structured_content,
|
|
|
|
|
|
|
+ # 8. 构建初始状态
|
|
|
|
|
+ initial_state = TaskChainState(
|
|
|
|
|
+ file_id=task_file_info.file_id,
|
|
|
|
|
+ callback_task_id=callback_task_id,
|
|
|
|
|
+ user_id=task_file_info.user_id,
|
|
|
|
|
+ file_name=task_file_info.file_name,
|
|
|
|
|
+ file_type=task_file_info.file_type,
|
|
|
|
|
+ file_content=task_file_info.file_content,
|
|
|
|
|
+ current_stage="start",
|
|
|
|
|
+ overall_task_status="processing",
|
|
|
|
|
+ stage_status={
|
|
|
|
|
+ "document": "pending",
|
|
|
|
|
+ "ai_review": "pending",
|
|
|
|
|
+ "report": "pending"
|
|
|
|
|
+ },
|
|
|
|
|
+ document_result=None,
|
|
|
|
|
+ ai_review_result=None,
|
|
|
|
|
+ report_result=None,
|
|
|
|
|
+ error_message=None,
|
|
|
progress_manager=self.progress_manager,
|
|
progress_manager=self.progress_manager,
|
|
|
- max_review_units=max_review_units,
|
|
|
|
|
- review_mode=review_mode
|
|
|
|
|
|
|
+ task_file_info=task_file_info,
|
|
|
|
|
+ messages=[HumanMessage(content=f"开始任务链: {task_file_info.file_id}")]
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- # 同步执行AI审查
|
|
|
|
|
|
|
+ # 9. 执行 LangGraph 任务链工作流
|
|
|
loop = asyncio.new_event_loop()
|
|
loop = asyncio.new_event_loop()
|
|
|
asyncio.set_event_loop(loop)
|
|
asyncio.set_event_loop(loop)
|
|
|
- ai_result = loop.run_until_complete(ai_workflow.execute())
|
|
|
|
|
|
|
+ result = loop.run_until_complete(self.task_chain_graph.ainvoke(initial_state))
|
|
|
loop.close()
|
|
loop.close()
|
|
|
|
|
|
|
|
- task_chain.results['ai_review'] = ai_result
|
|
|
|
|
-
|
|
|
|
|
- # # 阶段3:报告生成(串行)
|
|
|
|
|
- # task_chain.current_stage = "report_generation"
|
|
|
|
|
-
|
|
|
|
|
- # report_workflow = ReportWorkflow(
|
|
|
|
|
- # file_id=task_chain.file_id,
|
|
|
|
|
- # callback_task_id=task_chain.callback_task_id,
|
|
|
|
|
- # user_id=task_chain.user_id,
|
|
|
|
|
- # ai_review_results=ai_result,
|
|
|
|
|
- # progress_manager=self.progress_manager
|
|
|
|
|
- # )
|
|
|
|
|
-
|
|
|
|
|
- # # 同步执行报告生成
|
|
|
|
|
- # loop = asyncio.new_event_loop()
|
|
|
|
|
- # asyncio.set_event_loop(loop)
|
|
|
|
|
- # report_result = loop.run_until_complete(report_workflow.execute())
|
|
|
|
|
- # loop.close()
|
|
|
|
|
-
|
|
|
|
|
- # task_chain.results['report'] = report_result
|
|
|
|
|
-
|
|
|
|
|
- # 完成任务链
|
|
|
|
|
- task_chain.complete_processing()
|
|
|
|
|
-
|
|
|
|
|
- # 清理任务注册
|
|
|
|
|
|
|
+ # 10. 清理任务注册
|
|
|
asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
|
|
asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
|
|
|
- # 通知SSE连接任务完成
|
|
|
|
|
- asyncio.run(self.progress_manager.complete_task(task_chain.callback_task_id, task_chain.user_id))
|
|
|
|
|
|
|
|
|
|
- # 清理Redis文件缓存
|
|
|
|
|
- try:
|
|
|
|
|
- from foundation.utils.redis_utils import delete_file_info
|
|
|
|
|
- asyncio.run(delete_file_info(task_chain.file_id))
|
|
|
|
|
- logger.info(f"已清理Redis文件缓存: {task_chain.file_id}")
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.warning(f"清理Redis文件缓存失败: {str(e)}")
|
|
|
|
|
|
|
+ logger.info(f"施工方案审查任务已完成(LangGraph方案D)!")
|
|
|
|
|
+ logger.info(f"文件ID: {task_file_info.file_id}")
|
|
|
|
|
+ logger.info(f"文件名: {task_file_info.file_name}")
|
|
|
|
|
+ logger.info(f"整体状态: {result.get('overall_task_status', 'unknown')}")
|
|
|
|
|
+
|
|
|
|
|
+ # 构建可序列化的返回结果(移除不可序列化的对象)
|
|
|
|
|
+ serializable_result = {
|
|
|
|
|
+ "file_id": result.get("file_id"),
|
|
|
|
|
+ "callback_task_id": result.get("callback_task_id"),
|
|
|
|
|
+ "user_id": result.get("user_id"),
|
|
|
|
|
+ "file_name": result.get("file_name"),
|
|
|
|
|
+ "current_stage": result.get("current_stage"),
|
|
|
|
|
+ "overall_task_status": result.get("overall_task_status"),
|
|
|
|
|
+ "stage_status": result.get("stage_status"),
|
|
|
|
|
+ "error_message": result.get("error_message"),
|
|
|
|
|
+ # 注意:不包含 progress_manager, task_file_info, messages 等不可序列化对象
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- logger.info(f"文档处理任务链完成: {task_chain.callback_task_id}")
|
|
|
|
|
- return task_chain.results
|
|
|
|
|
|
|
+ return serializable_result
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
|
|
+ logger.error(f"提交文档处理任务失败: {str(e)}", exc_info=True)
|
|
|
|
|
+
|
|
|
# 标记任务失败
|
|
# 标记任务失败
|
|
|
- task_chain.fail_processing(str(e))
|
|
|
|
|
- logger.error(f"文档处理任务链失败: {task_chain.callback_task_id}, 错误: {str(e)}")
|
|
|
|
|
|
|
+ if callback_task_id in self.active_chains:
|
|
|
|
|
+ self.active_chains[callback_task_id].fail_processing(str(e))
|
|
|
|
|
|
|
|
# 清理任务注册
|
|
# 清理任务注册
|
|
|
- asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
|
|
|
|
|
-
|
|
|
|
|
- # 清理Redis文件缓存(即使失败也清理)
|
|
|
|
|
- try:
|
|
|
|
|
- from foundation.utils.redis_utils import delete_file_info
|
|
|
|
|
- asyncio.run(delete_file_info(task_chain.file_id))
|
|
|
|
|
- logger.info(f"已清理Redis文件缓存: {task_chain.file_id}")
|
|
|
|
|
- except Exception as cleanup_error:
|
|
|
|
|
- logger.warning(f"清理Redis文件缓存失败: {str(cleanup_error)}")
|
|
|
|
|
|
|
+ asyncio.run(self.redis_duplicate_checker.unregister_task(task_file_info.file_id))
|
|
|
|
|
|
|
|
# 通知SSE连接任务失败
|
|
# 通知SSE连接任务失败
|
|
|
- error_result = {
|
|
|
|
|
|
|
+ error_data = {
|
|
|
"error": str(e),
|
|
"error": str(e),
|
|
|
"status": "failed",
|
|
"status": "failed",
|
|
|
|
|
+ "overall_task_status": "failed",
|
|
|
"timestamp": datetime.now().isoformat()
|
|
"timestamp": datetime.now().isoformat()
|
|
|
}
|
|
}
|
|
|
- current_data = {
|
|
|
|
|
- "status": "failed",
|
|
|
|
|
- "result": error_result
|
|
|
|
|
- }
|
|
|
|
|
- asyncio.run(self.progress_manager.complete_task(task_chain.callback_task_id, task_chain.user_id, current_data))
|
|
|
|
|
|
|
+ asyncio.run(self.progress_manager.complete_task(callback_task_id, task_file_info.user_id, error_data))
|
|
|
|
|
|
|
|
raise
|
|
raise
|
|
|
finally:
|
|
finally:
|
|
|
# 清理活跃任务
|
|
# 清理活跃任务
|
|
|
- if task_chain.callback_task_id in self.active_chains:
|
|
|
|
|
- del self.active_chains[task_chain.callback_task_id]
|
|
|
|
|
|
|
+ if callback_task_id in self.active_chains:
|
|
|
|
|
+ del self.active_chains[callback_task_id]
|
|
|
|
|
|
|
|
- # ==================== 任务终止管理方法 ====================
|
|
|
|
|
|
|
|
|
|
async def set_terminate_signal(self, callback_task_id: str, operator: str = "unknown") -> Dict[str, any]:
|
|
async def set_terminate_signal(self, callback_task_id: str, operator: str = "unknown") -> Dict[str, any]:
|
|
|
"""
|
|
"""
|
|
@@ -456,4 +409,502 @@ class WorkflowManager:
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.error(f"获取任务信息失败: {str(e)}", exc_info=True)
|
|
logger.error(f"获取任务信息失败: {str(e)}", exc_info=True)
|
|
|
- return None
|
|
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ # ==================== LangGraph 任务链工作流(方案D)====================
|
|
|
|
|
+
|
|
|
|
|
+ def _build_task_chain_workflow(self) -> StateGraph:
|
|
|
|
|
+ """
|
|
|
|
|
+ 构建 LangGraph 任务链工作流图(方案D)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ StateGraph: 配置完成的 LangGraph 任务链图实例
|
|
|
|
|
+
|
|
|
|
|
+ Note:
|
|
|
|
|
+ 创建包含文档处理、AI审查(嵌套子图)、报告生成的完整任务链
|
|
|
|
|
+ 设置节点间的转换关系和条件边,支持终止检查和错误处理
|
|
|
|
|
+ 工作流路径: start → document_processing → ai_review_subgraph → report_generation → complete → END
|
|
|
|
|
+ """
|
|
|
|
|
+ logger.info("开始构建 LangGraph 任务链工作流图")
|
|
|
|
|
+
|
|
|
|
|
+ workflow = StateGraph(TaskChainState)
|
|
|
|
|
+
|
|
|
|
|
+ # 添加节点
|
|
|
|
|
+ workflow.add_node("start", self._start_chain_node)
|
|
|
|
|
+ workflow.add_node("document_processing", self._document_processing_node)
|
|
|
|
|
+ workflow.add_node("ai_review_subgraph", self._ai_review_subgraph_node)
|
|
|
|
|
+ workflow.add_node("report_generation", self._report_generation_node)
|
|
|
|
|
+ workflow.add_node("complete", self._complete_chain_node)
|
|
|
|
|
+ workflow.add_node("error_handler", self._error_handler_chain_node)
|
|
|
|
|
+ workflow.add_node("terminate", self._terminate_chain_node)
|
|
|
|
|
+
|
|
|
|
|
+ # 设置入口点
|
|
|
|
|
+ workflow.set_entry_point("start")
|
|
|
|
|
+
|
|
|
|
|
+ # 添加边和条件边
|
|
|
|
|
+ workflow.add_edge("start", "document_processing")
|
|
|
|
|
+
|
|
|
|
|
+ # 文档处理后检查终止信号
|
|
|
|
|
+ workflow.add_conditional_edges(
|
|
|
|
|
+ "document_processing",
|
|
|
|
|
+ self._should_terminate_or_error_chain,
|
|
|
|
|
+ {
|
|
|
|
|
+ "terminate": "terminate",
|
|
|
|
|
+ "error": "error_handler",
|
|
|
|
|
+ "continue": "ai_review_subgraph"
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # AI审查后检查终止信号
|
|
|
|
|
+ workflow.add_conditional_edges(
|
|
|
|
|
+ "ai_review_subgraph",
|
|
|
|
|
+ self._should_terminate_or_error_chain,
|
|
|
|
|
+ {
|
|
|
|
|
+ "terminate": "terminate",
|
|
|
|
|
+ "error": "error_handler",
|
|
|
|
|
+ "continue": "report_generation"
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 报告生成后检查终止信号
|
|
|
|
|
+ workflow.add_conditional_edges(
|
|
|
|
|
+ "report_generation",
|
|
|
|
|
+ self._should_terminate_or_error_chain,
|
|
|
|
|
+ {
|
|
|
|
|
+ "terminate": "terminate",
|
|
|
|
|
+ "error": "error_handler",
|
|
|
|
|
+ "continue": "complete"
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 完成节点直接结束
|
|
|
|
|
+ workflow.add_edge("complete", END)
|
|
|
|
|
+ workflow.add_edge("error_handler", END)
|
|
|
|
|
+ workflow.add_edge("terminate", END)
|
|
|
|
|
+
|
|
|
|
|
+ # 编译工作流图
|
|
|
|
|
+ compiled_graph = workflow.compile()
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("LangGraph 任务链工作流图构建完成")
|
|
|
|
|
+ return compiled_graph
|
|
|
|
|
+
|
|
|
|
|
+ async def _start_chain_node(self, state: TaskChainState) -> TaskChainState:
|
|
|
|
|
+ """
|
|
|
|
|
+ 任务链开始节点
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ state: 任务链状态
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ TaskChainState: 更新后的状态
|
|
|
|
|
+ """
|
|
|
|
|
+ logger.info(f"任务链工作流启动: {state['callback_task_id']}")
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "start",
|
|
|
|
|
+ "overall_task_status": "processing",
|
|
|
|
|
+ "stage_status": {
|
|
|
|
|
+ "document": "pending",
|
|
|
|
|
+ "ai_review": "pending",
|
|
|
|
|
+ "report": "pending"
|
|
|
|
|
+ },
|
|
|
|
|
+ "messages": [AIMessage(content="任务链工作流启动")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ async def _document_processing_node(self, state: TaskChainState) -> TaskChainState:
|
|
|
|
|
+ """
|
|
|
|
|
+ 文档处理节点
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ state: 任务链状态
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ TaskChainState: 更新后的状态,包含文档处理结果
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ logger.info(f"开始文档处理阶段: {state['callback_task_id']}")
|
|
|
|
|
+
|
|
|
|
|
+ # 检查终止信号
|
|
|
|
|
+ if await self.check_terminate_signal(state["callback_task_id"]):
|
|
|
|
|
+ logger.warning(f"文档处理阶段检测到终止信号: {state['callback_task_id']}")
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "document_processing",
|
|
|
|
|
+ "overall_task_status": "terminated",
|
|
|
|
|
+ "stage_status": {**state["stage_status"], "document": "terminated"},
|
|
|
|
|
+ "messages": [AIMessage(content="文档处理阶段检测到终止信号")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # 获取 TaskFileInfo 实例
|
|
|
|
|
+ task_file_info = state["task_file_info"]
|
|
|
|
|
+
|
|
|
|
|
+ # 创建文档工作流实例
|
|
|
|
|
+ document_workflow = DocumentWorkflow(
|
|
|
|
|
+ task_file_info=task_file_info,
|
|
|
|
|
+ progress_manager=state["progress_manager"],
|
|
|
|
|
+ redis_duplicate_checker=self.redis_duplicate_checker
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 执行文档处理
|
|
|
|
|
+ doc_result = await document_workflow.execute(
|
|
|
|
|
+ state["file_content"],
|
|
|
|
|
+ state["file_type"]
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"文档处理完成: {state['callback_task_id']}")
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "document_processing",
|
|
|
|
|
+ "overall_task_status": "processing",
|
|
|
|
|
+ "stage_status": {**state["stage_status"], "document": "completed"},
|
|
|
|
|
+ "document_result": doc_result,
|
|
|
|
|
+ "messages": [AIMessage(content="文档处理完成")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"文档处理失败: {str(e)}", exc_info=True)
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "document_processing",
|
|
|
|
|
+ "overall_task_status": "failed",
|
|
|
|
|
+ "stage_status": {**state["stage_status"], "document": "failed"},
|
|
|
|
|
+ "error_message": f"文档处理失败: {str(e)}",
|
|
|
|
|
+ "messages": [AIMessage(content=f"文档处理失败: {str(e)}")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ async def _ai_review_subgraph_node(self, state: TaskChainState) -> TaskChainState:
|
|
|
|
|
+ """
|
|
|
|
|
+ AI审查子图节点(嵌套现有的 AIReviewWorkflow)
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ state: 任务链状态
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ TaskChainState: 更新后的状态,包含AI审查结果
|
|
|
|
|
+
|
|
|
|
|
+ Note:
|
|
|
|
|
+ 这是方案D的核心实现:将现有的 AIReviewWorkflow 作为子图嵌套
|
|
|
|
|
+ 无需修改 AIReviewWorkflow 的代码,保持其独立性
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ logger.info(f"开始AI审查阶段: {state['callback_task_id']}")
|
|
|
|
|
+
|
|
|
|
|
+ # 检查终止信号
|
|
|
|
|
+ if await self.check_terminate_signal(state["callback_task_id"]):
|
|
|
|
|
+ logger.warning(f"AI审查阶段检测到终止信号: {state['callback_task_id']}")
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "ai_review",
|
|
|
|
|
+ "overall_task_status": "terminated",
|
|
|
|
|
+ "stage_status": {**state["stage_status"], "ai_review": "terminated"},
|
|
|
|
|
+ "messages": [AIMessage(content="AI审查阶段检测到终止信号")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # 获取文档处理结果中的结构化内容
|
|
|
|
|
+ structured_content = state["document_result"].get("structured_content")
|
|
|
|
|
+ if not structured_content:
|
|
|
|
|
+ raise ValueError("文档处理结果中缺少结构化内容")
|
|
|
|
|
+
|
|
|
|
|
+ # 获取 TaskFileInfo 实例
|
|
|
|
|
+ task_file_info = state["task_file_info"]
|
|
|
|
|
+
|
|
|
|
|
+ # 读取AI审查配置
|
|
|
|
|
+ import configparser
|
|
|
|
|
+ config = configparser.ConfigParser()
|
|
|
|
|
+ config.read('config/config.ini', encoding='utf-8')
|
|
|
|
|
+
|
|
|
|
|
+ max_review_units = config.getint('ai_review', 'MAX_REVIEW_UNITS', fallback=None)
|
|
|
|
|
+ if max_review_units == 0:
|
|
|
|
|
+ max_review_units = None
|
|
|
|
|
+ review_mode = config.get('ai_review', 'REVIEW_MODE', fallback='all')
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"AI审查配置: 最大审查数量={max_review_units}, 审查模式={review_mode}")
|
|
|
|
|
+
|
|
|
|
|
+ # 创建AI审查工作流实例(作为嵌套子图)
|
|
|
|
|
+ ai_workflow = AIReviewWorkflow(
|
|
|
|
|
+ task_file_info=task_file_info,
|
|
|
|
|
+ structured_content=structured_content,
|
|
|
|
|
+ progress_manager=state["progress_manager"],
|
|
|
|
|
+ max_review_units=max_review_units,
|
|
|
|
|
+ review_mode=review_mode
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 执行AI审查(内部使用 LangGraph)
|
|
|
|
|
+ ai_result = await ai_workflow.execute()
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"AI审查完成: {state['callback_task_id']}")
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "ai_review",
|
|
|
|
|
+ "overall_task_status": "processing",
|
|
|
|
|
+ "stage_status": {**state["stage_status"], "ai_review": "completed"},
|
|
|
|
|
+ "ai_review_result": ai_result,
|
|
|
|
|
+ "messages": [AIMessage(content="AI审查完成")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"AI审查失败: {str(e)}", exc_info=True)
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "ai_review",
|
|
|
|
|
+ "overall_task_status": "failed",
|
|
|
|
|
+ "stage_status": {**state["stage_status"], "ai_review": "failed"},
|
|
|
|
|
+ "error_message": f"AI审查失败: {str(e)}",
|
|
|
|
|
+ "messages": [AIMessage(content=f"AI审查失败: {str(e)}")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ async def _report_generation_node(self, state: TaskChainState) -> TaskChainState:
|
|
|
|
|
+ """
|
|
|
|
|
+ 报告生成节点
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ state: 任务链状态
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ TaskChainState: 更新后的状态,包含报告生成结果
|
|
|
|
|
+
|
|
|
|
|
+ Note:
|
|
|
|
|
+ 调用ReportWorkflow生成审查报告摘要(基于高中风险问题,使用LLM)
|
|
|
|
|
+ 根据决策2(方案A-方式1),在此阶段生成完整报告后一次性保存
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ logger.info(f"开始报告生成阶段: {state['callback_task_id']}")
|
|
|
|
|
+
|
|
|
|
|
+ # 检查终止信号
|
|
|
|
|
+ if await self.check_terminate_signal(state["callback_task_id"]):
|
|
|
|
|
+ logger.warning(f"报告生成阶段检测到终止信号: {state['callback_task_id']}")
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "report_generation",
|
|
|
|
|
+ "overall_task_status": "terminated",
|
|
|
|
|
+ "stage_status": {**state["stage_status"], "report": "terminated"},
|
|
|
|
|
+ "messages": [AIMessage(content="报告生成阶段检测到终止信号")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # 获取AI审查结果
|
|
|
|
|
+ ai_review_result = state.get("ai_review_result")
|
|
|
|
|
+ if not ai_review_result:
|
|
|
|
|
+ raise ValueError("AI审查结果缺失,无法生成报告")
|
|
|
|
|
+
|
|
|
|
|
+ # 获取 TaskFileInfo 实例
|
|
|
|
|
+ task_file_info = state["task_file_info"]
|
|
|
|
|
+
|
|
|
|
|
+ # 创建报告生成工作流实例
|
|
|
|
|
+ report_workflow = ReportWorkflow(
|
|
|
|
|
+ file_id=state["file_id"],
|
|
|
|
|
+ file_name=state["file_name"],
|
|
|
|
|
+ callback_task_id=state["callback_task_id"],
|
|
|
|
|
+ user_id=state["user_id"],
|
|
|
|
|
+ ai_review_results=ai_review_result,
|
|
|
|
|
+ progress_manager=state["progress_manager"]
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 执行报告生成
|
|
|
|
|
+ report_result = await report_workflow.execute()
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"报告生成完成: {state['callback_task_id']}")
|
|
|
|
|
+
|
|
|
|
|
+ # 保存完整结果(包含文档处理、AI审查、报告生成)
|
|
|
|
|
+ await self._save_complete_results(state, report_result)
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "report_generation",
|
|
|
|
|
+ "overall_task_status": "processing",
|
|
|
|
|
+ "stage_status": {**state["stage_status"], "report": "completed"},
|
|
|
|
|
+ "report_result": report_result,
|
|
|
|
|
+ "messages": [AIMessage(content="报告生成完成")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"报告生成失败: {str(e)}", exc_info=True)
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "report_generation",
|
|
|
|
|
+ "overall_task_status": "failed",
|
|
|
|
|
+ "stage_status": {**state["stage_status"], "report": "failed"},
|
|
|
|
|
+ "error_message": f"报告生成失败: {str(e)}",
|
|
|
|
|
+ "messages": [AIMessage(content=f"报告生成失败: {str(e)}")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ async def _complete_chain_node(self, state: TaskChainState) -> TaskChainState:
|
|
|
|
|
+ """
|
|
|
|
|
+ 任务链完成节点
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ state: 任务链状态
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ TaskChainState: 更新后的状态,标记整体任务已完成
|
|
|
|
|
+
|
|
|
|
|
+ Note:
|
|
|
|
|
+ 只有在所有阶段(文档处理、AI审查、报告生成)都完成后才标记 overall_task_status="completed"
|
|
|
|
|
+ 这解决了原有的状态语义混乱问题(P0-1)
|
|
|
|
|
+ """
|
|
|
|
|
+ logger.info(f"任务链工作流完成: {state['callback_task_id']}")
|
|
|
|
|
+
|
|
|
|
|
+ # 标记整体任务完成
|
|
|
|
|
+ if state["progress_manager"]:
|
|
|
|
|
+ await state["progress_manager"].complete_task(
|
|
|
|
|
+ state["callback_task_id"],
|
|
|
|
|
+ state["user_id"],
|
|
|
|
|
+ {"overall_task_status": "completed", "message": "所有阶段已完成"}
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 清理 Redis 缓存
|
|
|
|
|
+ try:
|
|
|
|
|
+ from foundation.utils.redis_utils import delete_file_info
|
|
|
|
|
+ await delete_file_info(state["file_id"])
|
|
|
|
|
+ logger.info(f"已清理 Redis 文件缓存: {state['file_id']}")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"清理 Redis 文件缓存失败: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "complete",
|
|
|
|
|
+ "overall_task_status": "completed", # ⚠️ 关键:只有到这里才标记整体完成
|
|
|
|
|
+ "messages": [AIMessage(content="任务链工作流完成")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ async def _error_handler_chain_node(self, state: TaskChainState) -> TaskChainState:
|
|
|
|
|
+ """
|
|
|
|
|
+ 任务链错误处理节点
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ state: 任务链状态
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ TaskChainState: 更新后的状态,标记为失败
|
|
|
|
|
+ """
|
|
|
|
|
+ logger.error(f"任务链工作流错误: {state['callback_task_id']}, 错误: {state.get('error_message', '未知错误')}")
|
|
|
|
|
+
|
|
|
|
|
+ # 通知失败
|
|
|
|
|
+ if state["progress_manager"]:
|
|
|
|
|
+ error_data = {
|
|
|
|
|
+ "overall_task_status": "failed",
|
|
|
|
|
+ "error": state.get("error_message", "未知错误"),
|
|
|
|
|
+ "status": "failed",
|
|
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
|
|
+ }
|
|
|
|
|
+ await state["progress_manager"].complete_task(
|
|
|
|
|
+ state["callback_task_id"],
|
|
|
|
|
+ state["user_id"],
|
|
|
|
|
+ error_data
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 清理 Redis 缓存(即使失败也清理)
|
|
|
|
|
+ try:
|
|
|
|
|
+ from foundation.utils.redis_utils import delete_file_info
|
|
|
|
|
+ await delete_file_info(state["file_id"])
|
|
|
|
|
+ logger.info(f"已清理 Redis 文件缓存: {state['file_id']}")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"清理 Redis 文件缓存失败: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "error_handler",
|
|
|
|
|
+ "overall_task_status": "failed",
|
|
|
|
|
+ "messages": [AIMessage(content=f"任务链错误: {state.get('error_message', '未知错误')}")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ async def _terminate_chain_node(self, state: TaskChainState) -> TaskChainState:
|
|
|
|
|
+ """
|
|
|
|
|
+ 任务链终止节点
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ state: 任务链状态
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ TaskChainState: 更新后的状态,标记为已终止
|
|
|
|
|
+ """
|
|
|
|
|
+ logger.warning(f"任务链工作流已终止: {state['callback_task_id']}")
|
|
|
|
|
+
|
|
|
|
|
+ # 通知终止
|
|
|
|
|
+ if state["progress_manager"]:
|
|
|
|
|
+ await state["progress_manager"].complete_task(
|
|
|
|
|
+ state["callback_task_id"],
|
|
|
|
|
+ state["user_id"],
|
|
|
|
|
+ {"overall_task_status": "terminated", "message": "任务已被用户终止"}
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 清理 Redis 终止信号
|
|
|
|
|
+ await self.clear_terminate_signal(state["callback_task_id"])
|
|
|
|
|
+
|
|
|
|
|
+ # 清理 Redis 文件缓存
|
|
|
|
|
+ try:
|
|
|
|
|
+ from foundation.utils.redis_utils import delete_file_info
|
|
|
|
|
+ await delete_file_info(state["file_id"])
|
|
|
|
|
+ logger.info(f"已清理 Redis 文件缓存: {state['file_id']}")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"清理 Redis 文件缓存失败: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "current_stage": "terminated",
|
|
|
|
|
+ "overall_task_status": "terminated",
|
|
|
|
|
+ "messages": [AIMessage(content="任务链已被终止")]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def _should_terminate_or_error_chain(self, state: TaskChainState) -> str:
|
|
|
|
|
+ """
|
|
|
|
|
+ 检查任务链是否应该终止或发生错误
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ state: 任务链状态
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ str: "terminate", "error", 或 "continue"
|
|
|
|
|
+
|
|
|
|
|
+ Note:
|
|
|
|
|
+ 这是条件边判断方法,用于决定工作流的下一步走向
|
|
|
|
|
+ 1. 优先检查终止信号
|
|
|
|
|
+ 2. 检查是否有错误
|
|
|
|
|
+ 3. 都没有则继续执行
|
|
|
|
|
+ """
|
|
|
|
|
+ # 检查终止状态
|
|
|
|
|
+ if state.get("overall_task_status") == "terminated":
|
|
|
|
|
+ return "terminate"
|
|
|
|
|
+
|
|
|
|
|
+ # 检查错误状态
|
|
|
|
|
+ if state.get("overall_task_status") == "failed" or state.get("error_message"):
|
|
|
|
|
+ return "error"
|
|
|
|
|
+
|
|
|
|
|
+ # 默认继续执行
|
|
|
|
|
+ return "continue"
|
|
|
|
|
+
|
|
|
|
|
+ async def _save_complete_results(self, state: TaskChainState, report_result: Dict[str, Any]):
|
|
|
|
|
+ """
|
|
|
|
|
+ 保存完整结果(方案A-方式1:一次性保存)
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ state: 任务链状态
|
|
|
|
|
+ report_result: 报告生成结果
|
|
|
|
|
+
|
|
|
|
|
+ Note:
|
|
|
|
|
+ 根据决策2(方案A-方式1),在报告工作流完成后一次性保存完整结果
|
|
|
|
|
+ 包含:文档处理结果 + AI审查结果 + 报告生成结果
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ import json
|
|
|
|
|
+ import os
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"开始保存完整结果: {state['callback_task_id']}")
|
|
|
|
|
+
|
|
|
|
|
+ # 创建 temp 目录
|
|
|
|
|
+ temp_dir = "temp"
|
|
|
|
|
+ os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ # 构建完整结果
|
|
|
|
|
+ complete_results = {
|
|
|
|
|
+ "callback_task_id": state["callback_task_id"],
|
|
|
|
|
+ "file_id": state["file_id"],
|
|
|
|
|
+ "file_name": state["file_name"],
|
|
|
|
|
+ "user_id": state["user_id"],
|
|
|
|
|
+ "overall_task_status": "processing", # 此时还在处理中,complete节点才标记completed
|
|
|
|
|
+ "stage_status": state["stage_status"],
|
|
|
|
|
+ "document_result": state.get("document_result"),
|
|
|
|
|
+ "ai_review_result": state.get("ai_review_result"),
|
|
|
|
|
+ "report_result": report_result,
|
|
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # 保存到文件
|
|
|
|
|
+ file_path = os.path.join(temp_dir, f"{state['callback_task_id']}.json")
|
|
|
|
|
+ with open(file_path, 'w', encoding='utf-8') as f:
|
|
|
|
|
+ json.dump(complete_results, f, ensure_ascii=False, indent=2)
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"完整结果已保存到: {file_path}")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"保存完整结果失败: {str(e)}", exc_info=True)
|
|
|
|
|
+ raise
|