workflow_manager.py 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365
  1. """
  2. 基于LangGraph的工作流管理器
  3. 负责任务的创建、编排和执行,使用LangGraph进行状态管理
  4. 新增功能:
  5. - 任务终止管理
  6. - 终止信号设置和检测
  7. """
  8. import asyncio
  9. import time
  10. import json
  11. from typing import Dict, Optional, Any
  12. from datetime import datetime
  13. from langgraph.graph import StateGraph, END
  14. from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
  15. from foundation.observability.logger.loggering import review_logger as logger
  16. from foundation.observability.monitoring.time_statistics import track_execution_time
  17. from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
  18. from .progress_manager import ProgressManager
  19. from .redis_duplicate_checker import RedisDuplicateChecker
  20. from .task_models import TaskFileInfo, TaskChain
  21. from ..construction_review.workflows import DocumentWorkflow, AIReviewWorkflow, ReportWorkflow
  22. from ..construction_review.workflows.types import TaskChainState
  23. class ProgressManagerRegistry:
  24. """ProgressManager注册表 - 为每个任务管理独立的ProgressManager实例"""
  25. _registry = {} # {callback_task_id: ProgressManager}
  26. @classmethod
  27. def register_progress_manager(cls, callback_task_id: str, progress_manager: ProgressManager):
  28. """注册ProgressManager实例"""
  29. cls._registry[callback_task_id] = progress_manager
  30. logger.info(f"注册ProgressManager实例: {callback_task_id}, ID: {id(progress_manager)}")
  31. @classmethod
  32. def get_progress_manager(cls, callback_task_id: str) -> ProgressManager:
  33. """获取ProgressManager实例"""
  34. return cls._registry.get(callback_task_id)
  35. @classmethod
  36. def unregister_progress_manager(cls, callback_task_id: str):
  37. """注销ProgressManager实例"""
  38. if callback_task_id in cls._registry:
  39. del cls._registry[callback_task_id]
  40. logger.info(f"注销ProgressManager实例: {callback_task_id}")
  41. class WorkflowManager:
  42. """工作流管理器"""
  43. def __init__(self, max_concurrent_docs: int = 5, max_concurrent_reviews: int = 10):
  44. self.max_concurrent_docs = max_concurrent_docs
  45. self.max_concurrent_reviews = max_concurrent_reviews
  46. # 并发控制
  47. self.doc_semaphore = asyncio.Semaphore(max_concurrent_docs)
  48. self.review_semaphore = asyncio.Semaphore(max_concurrent_reviews)
  49. # 服务组件
  50. self.progress_manager = ProgressManager()
  51. self.redis_duplicate_checker = RedisDuplicateChecker()
  52. # 活跃任务跟踪
  53. self.active_chains: Dict[str, TaskChain] = {}
  54. self._cleanup_task_started = False
  55. # 任务终止管理
  56. self._terminate_signal_prefix = "ai_review:terminate_signal:"
  57. self._task_expire_time = 7200 # 2小时
  58. # LangGraph 任务链工作流(方案D)
  59. self.task_chain_graph = None # 延迟初始化,避免循环导入
  60. # ==================== 施工方案编写任务管理 ====================
  61. # 大纲生成活跃任务跟踪
  62. self.active_outline_tasks: Dict[str, Any] = {}
  63. # 大纲生成任务 Redis 前缀
  64. self._outline_result_prefix = "outline_write:result:"
  65. self._outline_terminate_signal_prefix = "outline_write:terminate_signal:"
  66. # 大纲生成工作流图(延迟初始化)
  67. self.outline_generation_graph = None
  68. async def submit_task_processing(self, file_info: dict) -> str:
  69. """异步提交任务处理(用于file_upload层)"""
  70. from foundation.infrastructure.messaging.tasks import submit_task_processing_task
  71. from foundation.infrastructure.tracing.celery_trace import CeleryTraceManager
  72. try:
  73. logger.info(f"提交文档处理任务到Celery: {file_info['file_id']}")
  74. # 使用CeleryTraceManager提交任务,自动传递trace_id
  75. task = CeleryTraceManager.submit_celery_task(
  76. submit_task_processing_task,
  77. file_info
  78. )
  79. logger.info(f"Celery任务已提交,Task ID: {task.id}")
  80. return task.id
  81. except Exception as e:
  82. logger.error(f"提交Celery任务失败: {str(e)}")
  83. raise
  84. @track_execution_time
  85. def submit_construction_review_task_processing_sync(self, file_info: dict) -> dict:
  86. """
  87. 同步提交施工审查任务处理(用于Celery worker)
  88. Note:
  89. 已切换到 LangGraph 任务链工作流(方案D)
  90. 使用统一的状态管理和嵌套子图架构
  91. """
  92. try:
  93. logger.info(f"提交文档处理任务(LangGraph方案D): {file_info['file_id']}")
  94. # 1. 创建TaskFileInfo对象(封装任务文件信息)
  95. task_file_info = TaskFileInfo(file_info)
  96. logger.info(f"创建任务文件信息: {task_file_info}")
  97. # 2. 生成任务链ID
  98. callback_task_id = task_file_info.callback_task_id
  99. # 3. 创建任务链(引用 TaskFileInfo,避免数据重复)
  100. task_chain = TaskChain(task_file_info)
  101. # 4. 标记任务开始
  102. task_chain.start_processing()
  103. # 5. 添加到活跃任务跟踪
  104. self.active_chains[callback_task_id] = task_chain
  105. # 6. 初始化进度管理
  106. asyncio.run(self.progress_manager.initialize_progress(
  107. callback_task_id=callback_task_id,
  108. user_id=task_file_info.user_id,
  109. stages=[]
  110. ))
  111. # 7. 构建 LangGraph 任务链工作流(延迟初始化)
  112. if self.task_chain_graph is None:
  113. self.task_chain_graph = self._build_task_chain_workflow()
  114. # 8. 构建初始状态
  115. initial_state = TaskChainState(
  116. file_id=task_file_info.file_id,
  117. callback_task_id=callback_task_id,
  118. user_id=task_file_info.user_id,
  119. file_name=task_file_info.file_name,
  120. file_type=task_file_info.file_type,
  121. file_content=task_file_info.file_content,
  122. current_stage="start",
  123. overall_task_status="processing",
  124. stage_status={
  125. "document": "pending",
  126. "ai_review": "pending",
  127. "report": "pending"
  128. },
  129. document_result=None,
  130. ai_review_result=None,
  131. report_result=None,
  132. error_message=None,
  133. progress_manager=self.progress_manager,
  134. task_file_info=task_file_info,
  135. messages=[HumanMessage(content=f"开始任务链: {task_file_info.file_id}")]
  136. )
  137. # 9. 执行 LangGraph 任务链工作流
  138. loop = asyncio.new_event_loop()
  139. asyncio.set_event_loop(loop)
  140. result = loop.run_until_complete(self.task_chain_graph.ainvoke(initial_state))
  141. loop.close()
  142. # 10. 清理任务注册
  143. asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
  144. logger.info(f"施工方案审查任务已完成(LangGraph方案D)!")
  145. logger.info(f"文件ID: {task_file_info.file_id}")
  146. logger.info(f"文件名: {task_file_info.file_name}")
  147. logger.info(f"整体状态: {result.get('overall_task_status', 'unknown')}")
  148. # 构建可序列化的返回结果(移除不可序列化的对象)
  149. serializable_result = {
  150. "file_id": result.get("file_id"),
  151. "callback_task_id": result.get("callback_task_id"),
  152. "user_id": result.get("user_id"),
  153. "file_name": result.get("file_name"),
  154. "current_stage": result.get("current_stage"),
  155. "overall_task_status": result.get("overall_task_status"),
  156. "stage_status": result.get("stage_status"),
  157. "error_message": result.get("error_message"),
  158. # 注意:不包含 progress_manager, task_file_info, messages 等不可序列化对象
  159. }
  160. return serializable_result
  161. except Exception as e:
  162. logger.error(f"提交文档处理任务失败: {str(e)}", exc_info=True)
  163. # 标记任务失败
  164. if callback_task_id in self.active_chains:
  165. self.active_chains[callback_task_id].fail_processing(str(e))
  166. # 清理任务注册
  167. asyncio.run(self.redis_duplicate_checker.unregister_task(task_file_info.file_id))
  168. # 通知SSE连接任务失败
  169. error_data = {
  170. "error": str(e),
  171. "status": "failed",
  172. "overall_task_status": "failed",
  173. "timestamp": datetime.now().isoformat()
  174. }
  175. asyncio.run(self.progress_manager.complete_task(callback_task_id, task_file_info.user_id, error_data))
  176. raise
  177. finally:
  178. # 清理活跃任务
  179. if callback_task_id in self.active_chains:
  180. del self.active_chains[callback_task_id]
  181. async def set_terminate_signal(self, callback_task_id: str, operator: str = "unknown") -> Dict[str, any]:
  182. """
  183. 设置任务终止信号
  184. Args:
  185. callback_task_id: 任务回调ID
  186. operator: 操作人(用户ID或系统标识)
  187. Returns:
  188. Dict: 操作结果 {"success": bool, "message": str, "task_info": dict}
  189. Note:
  190. 将终止信号写入 Redis,支持跨进程检测
  191. AI审查节点在执行前会检查此信号
  192. """
  193. try:
  194. # 检查任务是否在活跃列表中
  195. if callback_task_id not in self.active_chains:
  196. return {
  197. "success": False,
  198. "message": f"任务不存在或已完成: {callback_task_id}",
  199. "task_info": None
  200. }
  201. task_chain = self.active_chains[callback_task_id]
  202. # 检查任务状态
  203. if task_chain.status != "processing":
  204. return {
  205. "success": False,
  206. "message": f"任务状态不是 processing,无需终止: {callback_task_id} (当前状态: {task_chain.status})",
  207. "task_info": {
  208. "callback_task_id": callback_task_id,
  209. "status": task_chain.status,
  210. "file_name": task_chain.file_name
  211. }
  212. }
  213. # 设置 Redis 终止信号
  214. redis_client = await RedisConnectionFactory.get_connection()
  215. terminate_key = f"{self._terminate_signal_prefix}{callback_task_id}"
  216. # 存储终止信号和操作人、时间
  217. terminate_data = {
  218. "operator": operator,
  219. "terminate_time": str(time.time()),
  220. "task_id": callback_task_id
  221. }
  222. # 使用 hash 存储更多信息
  223. await redis_client.hset(terminate_key, mapping=terminate_data)
  224. # 设置过期时间(2小时)
  225. await redis_client.expire(terminate_key, self._task_expire_time)
  226. logger.info(f"已设置终止信号: {callback_task_id} (操作人: {operator}, 文件: {task_chain.file_name})")
  227. return {
  228. "success": True,
  229. "message": f"终止信号已设置,任务将在当前节点完成后终止",
  230. "task_info": {
  231. "callback_task_id": callback_task_id,
  232. "file_id": task_chain.file_id,
  233. "file_name": task_chain.file_name,
  234. "user_id": task_chain.user_id,
  235. "status": task_chain.status,
  236. "current_stage": task_chain.current_stage
  237. }
  238. }
  239. except Exception as e:
  240. logger.error(f"设置终止信号失败: {str(e)}", exc_info=True)
  241. return {
  242. "success": False,
  243. "message": f"设置终止信号失败: {str(e)}",
  244. "task_info": None
  245. }
  246. async def check_terminate_signal(self, callback_task_id: str) -> bool:
  247. """
  248. 检查是否有终止信号
  249. Args:
  250. callback_task_id: 任务回调ID
  251. Returns:
  252. bool: 有终止信号返回 True
  253. Note:
  254. 从 Redis 读取终止信号
  255. 工作流节点在执行前调用此方法检查是否需要终止
  256. """
  257. try:
  258. redis_client = await RedisConnectionFactory.get_connection()
  259. terminate_key = f"{self._terminate_signal_prefix}{callback_task_id}"
  260. # 检查键是否存在
  261. exists = await redis_client.exists(terminate_key)
  262. if exists:
  263. # 读取终止信息
  264. terminate_info = await redis_client.hgetall(terminate_key)
  265. logger.warning(f"检测到终止信号: {callback_task_id}, 操作人: {terminate_info.get(b'operator', b'unknown').decode()}")
  266. return True
  267. return False
  268. except RuntimeError as e:
  269. # 事件循环相关的错误处理
  270. error_msg = str(e)
  271. if "Event loop is closed" in error_msg:
  272. # 事件循环关闭是正常情况(任务结束),不记录错误
  273. logger.debug(f"检查终止信号时事件循环已关闭: {callback_task_id}")
  274. return False
  275. elif "bound to a different event loop" in error_msg:
  276. # 事件循环不匹配,记录警告但不中断流程
  277. logger.warning(f"检查终止信号时检测到事件循环不匹配: {callback_task_id},将忽略本次检查")
  278. return False
  279. else:
  280. # 其他 RuntimeError 记录错误
  281. logger.error(f"检查终止信号失败(RuntimeError): {error_msg}", exc_info=True)
  282. return False
  283. except Exception as e:
  284. # 其他异常仍然记录错误
  285. logger.error(f"检查终止信号失败: {str(e)}", exc_info=True)
  286. return False
  287. async def clear_terminate_signal(self, callback_task_id: str):
  288. """
  289. 清理 Redis 中的终止信号
  290. Args:
  291. callback_task_id: 任务回调ID
  292. """
  293. try:
  294. redis_client = await RedisConnectionFactory.get_connection()
  295. terminate_key = f"{self._terminate_signal_prefix}{callback_task_id}"
  296. await redis_client.delete(terminate_key)
  297. logger.debug(f"清理终止信号: {callback_task_id}")
  298. except Exception as e:
  299. logger.warning(f"清理终止信号失败: {str(e)}")
  300. async def get_active_tasks(self) -> list:
  301. """
  302. 获取活跃任务列表
  303. Returns:
  304. list: 活跃任务信息列表
  305. """
  306. try:
  307. active_tasks = []
  308. current_time = time.time()
  309. for task_id, task_chain in self.active_chains.items():
  310. if task_chain.status == "processing":
  311. task_info = {
  312. "callback_task_id": task_id,
  313. "file_id": task_chain.file_id,
  314. "file_name": task_chain.file_name,
  315. "user_id": task_chain.user_id,
  316. "status": task_chain.status,
  317. "current_stage": task_chain.current_stage,
  318. "start_time": task_chain.start_time,
  319. "running_duration": int(current_time - task_chain.start_time) if task_chain.start_time else 0
  320. }
  321. active_tasks.append(task_info)
  322. return active_tasks
  323. except Exception as e:
  324. logger.error(f"获取活跃任务列表失败: {str(e)}", exc_info=True)
  325. return []
  326. async def get_task_info(self, callback_task_id: str) -> Optional[Dict]:
  327. """
  328. 获取任务信息
  329. Args:
  330. callback_task_id: 任务回调ID
  331. Returns:
  332. Optional[Dict]: 任务信息字典,不存在返回 None
  333. """
  334. try:
  335. task_chain = self.active_chains.get(callback_task_id)
  336. if task_chain:
  337. current_time = time.time()
  338. return {
  339. "callback_task_id": callback_task_id,
  340. "file_id": task_chain.file_id,
  341. "file_name": task_chain.file_name,
  342. "user_id": task_chain.user_id,
  343. "status": task_chain.status,
  344. "current_stage": task_chain.current_stage,
  345. "start_time": task_chain.start_time,
  346. "running_duration": int(current_time - task_chain.start_time) if task_chain.start_time else 0,
  347. "results": task_chain.results
  348. }
  349. return None
  350. except Exception as e:
  351. logger.error(f"获取任务信息失败: {str(e)}", exc_info=True)
  352. return None
  353. def _build_task_chain_workflow(self) -> StateGraph:
  354. """
  355. 构建 LangGraph 任务链工作流图(方案D)
  356. Returns:
  357. StateGraph: 配置完成的 LangGraph 任务链图实例
  358. Note:
  359. 创建包含文档处理、AI审查(嵌套子图)、报告生成的完整任务链
  360. 设置节点间的转换关系和条件边,支持终止检查和错误处理
  361. 工作流路径: start → document_processing → ai_review_subgraph → report_generation → complete → END
  362. """
  363. logger.info("开始构建 LangGraph 任务链工作流图")
  364. workflow = StateGraph(TaskChainState)
  365. # 添加节点
  366. workflow.add_node("start", self._start_chain_node)
  367. workflow.add_node("document_processing", self._document_processing_node)
  368. workflow.add_node("ai_review_subgraph", self._ai_review_subgraph_node)
  369. workflow.add_node("report_generation", self._report_generation_node)
  370. workflow.add_node("complete", self._complete_chain_node)
  371. workflow.add_node("error_handler", self._error_handler_chain_node)
  372. workflow.add_node("terminate", self._terminate_chain_node)
  373. # 设置入口点
  374. workflow.set_entry_point("start")
  375. # 添加边和条件边
  376. workflow.add_edge("start", "document_processing")
  377. # 文档处理后检查终止信号
  378. workflow.add_conditional_edges(
  379. "document_processing",
  380. self._should_terminate_or_error_chain,
  381. {
  382. "terminate": "terminate",
  383. "error": "error_handler",
  384. "continue": "ai_review_subgraph"
  385. }
  386. )
  387. # AI审查后检查终止信号
  388. workflow.add_conditional_edges(
  389. "ai_review_subgraph",
  390. self._should_terminate_or_error_chain,
  391. {
  392. "terminate": "terminate",
  393. "error": "error_handler",
  394. "continue": "report_generation"
  395. }
  396. )
  397. # 报告生成后检查终止信号
  398. workflow.add_conditional_edges(
  399. "report_generation",
  400. self._should_terminate_or_error_chain,
  401. {
  402. "terminate": "terminate",
  403. "error": "error_handler",
  404. "continue": "complete"
  405. }
  406. )
  407. # 完成节点直接结束
  408. workflow.add_edge("complete", END)
  409. workflow.add_edge("error_handler", END)
  410. workflow.add_edge("terminate", END)
  411. # 编译工作流图
  412. compiled_graph = workflow.compile()
  413. logger.info("LangGraph 任务链工作流图构建完成")
  414. return compiled_graph
  415. async def _start_chain_node(self, state: TaskChainState) -> TaskChainState:
  416. """
  417. 任务链开始节点
  418. Args:
  419. state: 任务链状态
  420. Returns:
  421. TaskChainState: 更新后的状态
  422. """
  423. logger.info(f"任务链工作流启动: {state['callback_task_id']}")
  424. return {
  425. "current_stage": "start",
  426. "overall_task_status": "processing",
  427. "stage_status": {
  428. "document": "pending",
  429. "ai_review": "pending",
  430. "report": "pending"
  431. },
  432. "messages": [AIMessage(content="任务链工作流启动")]
  433. }
  434. async def _document_processing_node(self, state: TaskChainState) -> TaskChainState:
  435. """
  436. 文档处理节点
  437. Args:
  438. state: 任务链状态
  439. Returns:
  440. TaskChainState: 更新后的状态,包含文档处理结果
  441. """
  442. try:
  443. logger.info(f"开始文档处理阶段: {state['callback_task_id']}")
  444. # 检查终止信号
  445. if await self.check_terminate_signal(state["callback_task_id"]):
  446. logger.warning(f"文档处理阶段检测到终止信号: {state['callback_task_id']}")
  447. return {
  448. "current_stage": "document_processing",
  449. "overall_task_status": "terminated",
  450. "stage_status": {**state["stage_status"], "document": "terminated"},
  451. "messages": [AIMessage(content="文档处理阶段检测到终止信号")]
  452. }
  453. # 获取 TaskFileInfo 实例
  454. task_file_info = state["task_file_info"]
  455. # 创建文档工作流实例
  456. document_workflow = DocumentWorkflow(
  457. task_file_info=task_file_info,
  458. progress_manager=state["progress_manager"],
  459. redis_duplicate_checker=self.redis_duplicate_checker
  460. )
  461. # 执行文档处理
  462. doc_result = await document_workflow.execute(
  463. state["file_content"],
  464. state["file_type"]
  465. )
  466. logger.info(f"文档处理完成: {state['callback_task_id']}")
  467. return {
  468. "current_stage": "document_processing",
  469. "overall_task_status": "processing",
  470. "stage_status": {**state["stage_status"], "document": "completed"},
  471. "document_result": doc_result,
  472. "messages": [AIMessage(content="文档处理完成")]
  473. }
  474. except Exception as e:
  475. logger.error(f"文档处理失败: {str(e)}", exc_info=True)
  476. return {
  477. "current_stage": "document_processing",
  478. "overall_task_status": "failed",
  479. "stage_status": {**state["stage_status"], "document": "failed"},
  480. "error_message": f"文档处理失败: {str(e)}",
  481. "messages": [AIMessage(content=f"文档处理失败: {str(e)}")]
  482. }
  483. async def _ai_review_subgraph_node(self, state: TaskChainState) -> TaskChainState:
  484. """
  485. AI审查子图节点(嵌套现有的 AIReviewWorkflow)
  486. Args:
  487. state: 任务链状态
  488. Returns:
  489. TaskChainState: 更新后的状态,包含AI审查结果
  490. Note:
  491. 这是方案D的核心实现:将现有的 AIReviewWorkflow 作为子图嵌套
  492. 无需修改 AIReviewWorkflow 的代码,保持其独立性
  493. """
  494. try:
  495. logger.info(f"开始AI审查阶段: {state['callback_task_id']}")
  496. # 检查终止信号
  497. if await self.check_terminate_signal(state["callback_task_id"]):
  498. logger.warning(f"AI审查阶段检测到终止信号: {state['callback_task_id']}")
  499. return {
  500. "current_stage": "ai_review",
  501. "overall_task_status": "terminated",
  502. "stage_status": {**state["stage_status"], "ai_review": "terminated"},
  503. "messages": [AIMessage(content="AI审查阶段检测到终止信号")]
  504. }
  505. # 获取文档处理结果中的结构化内容
  506. structured_content = state["document_result"].get("structured_content")
  507. if not structured_content:
  508. raise ValueError("文档处理结果中缺少结构化内容")
  509. # 获取 TaskFileInfo 实例
  510. task_file_info = state["task_file_info"]
  511. # 读取AI审查配置
  512. import configparser
  513. config = configparser.ConfigParser()
  514. config.read('config/config.ini', encoding='utf-8')
  515. max_review_units = config.getint('ai_review', 'MAX_REVIEW_UNITS', fallback=None)
  516. if max_review_units == 0:
  517. max_review_units = None
  518. review_mode = config.get('ai_review', 'REVIEW_MODE', fallback='all')
  519. logger.info(f"AI审查配置: 最大审查数量={max_review_units}, 审查模式={review_mode}")
  520. # 创建AI审查工作流实例(作为嵌套子图)
  521. ai_workflow = AIReviewWorkflow(
  522. task_file_info=task_file_info,
  523. structured_content=structured_content,
  524. progress_manager=state["progress_manager"],
  525. max_review_units=max_review_units,
  526. review_mode=review_mode
  527. )
  528. # 执行AI审查(内部使用 LangGraph)
  529. ai_result = await ai_workflow.execute()
  530. logger.info(f"AI审查完成: {state['callback_task_id']}")
  531. return {
  532. "current_stage": "ai_review",
  533. "overall_task_status": "processing",
  534. "stage_status": {**state["stage_status"], "ai_review": "completed"},
  535. "ai_review_result": ai_result,
  536. "messages": [AIMessage(content="AI审查完成")]
  537. }
  538. except Exception as e:
  539. logger.error(f"AI审查失败: {str(e)}", exc_info=True)
  540. return {
  541. "current_stage": "ai_review",
  542. "overall_task_status": "failed",
  543. "stage_status": {**state["stage_status"], "ai_review": "failed"},
  544. "error_message": f"AI审查失败: {str(e)}",
  545. "messages": [AIMessage(content=f"AI审查失败: {str(e)}")]
  546. }
  547. async def _report_generation_node(self, state: TaskChainState) -> TaskChainState:
  548. """
  549. 报告生成节点
  550. Args:
  551. state: 任务链状态
  552. Returns:
  553. TaskChainState: 更新后的状态,包含报告生成结果
  554. Note:
  555. 调用ReportWorkflow生成审查报告摘要(基于高中风险问题,使用LLM)
  556. 根据决策2(方案A-方式1),在此阶段生成完整报告后一次性保存
  557. """
  558. try:
  559. logger.info(f"开始报告生成阶段: {state['callback_task_id']}")
  560. # 检查终止信号
  561. if await self.check_terminate_signal(state["callback_task_id"]):
  562. logger.warning(f"报告生成阶段检测到终止信号: {state['callback_task_id']}")
  563. return {
  564. "current_stage": "report_generation",
  565. "overall_task_status": "terminated",
  566. "stage_status": {**state["stage_status"], "report": "terminated"},
  567. "messages": [AIMessage(content="报告生成阶段检测到终止信号")]
  568. }
  569. # 获取AI审查结果
  570. ai_review_result = state.get("ai_review_result")
  571. if not ai_review_result:
  572. raise ValueError("AI审查结果缺失,无法生成报告")
  573. # 获取 TaskFileInfo 实例
  574. task_file_info = state["task_file_info"]
  575. # 创建报告生成工作流实例
  576. report_workflow = ReportWorkflow(
  577. file_id=state["file_id"],
  578. file_name=state["file_name"],
  579. callback_task_id=state["callback_task_id"],
  580. user_id=state["user_id"],
  581. ai_review_results=ai_review_result,
  582. progress_manager=state["progress_manager"]
  583. )
  584. # 执行报告生成
  585. report_result = await report_workflow.execute()
  586. logger.info(f"报告生成完成: {state['callback_task_id']}")
  587. # 保存完整结果(包含文档处理、AI审查、报告生成)
  588. await self._save_complete_results(state, report_result)
  589. return {
  590. "current_stage": "report_generation",
  591. "overall_task_status": "processing",
  592. "stage_status": {**state["stage_status"], "report": "completed"},
  593. "report_result": report_result,
  594. "messages": [AIMessage(content="报告生成完成")]
  595. }
  596. except Exception as e:
  597. logger.error(f"报告生成失败: {str(e)}", exc_info=True)
  598. return {
  599. "current_stage": "report_generation",
  600. "overall_task_status": "failed",
  601. "stage_status": {**state["stage_status"], "report": "failed"},
  602. "error_message": f"报告生成失败: {str(e)}",
  603. "messages": [AIMessage(content=f"报告生成失败: {str(e)}")]
  604. }
  605. async def _complete_chain_node(self, state: TaskChainState) -> TaskChainState:
  606. """
  607. 任务链完成节点
  608. Args:
  609. state: 任务链状态
  610. Returns:
  611. TaskChainState: 更新后的状态,标记整体任务已完成
  612. Note:
  613. 只有在所有阶段(文档处理、AI审查、报告生成)都完成后才标记 overall_task_status="completed"
  614. 这解决了原有的状态语义混乱问题(P0-1)
  615. """
  616. logger.info(f"任务链工作流完成: {state['callback_task_id']}")
  617. # 标记整体任务完成
  618. if state["progress_manager"]:
  619. await state["progress_manager"].complete_task(
  620. state["callback_task_id"],
  621. state["user_id"],
  622. {"overall_task_status": "completed", "message": "所有阶段已完成"}
  623. )
  624. # 清理 Redis 缓存
  625. try:
  626. from foundation.utils.redis_utils import delete_file_info
  627. await delete_file_info(state["file_id"])
  628. logger.info(f"已清理 Redis 文件缓存: {state['file_id']}")
  629. except Exception as e:
  630. logger.warning(f"清理 Redis 文件缓存失败: {str(e)}")
  631. return {
  632. "current_stage": "complete",
  633. "overall_task_status": "completed", # ⚠️ 关键:只有到这里才标记整体完成
  634. "messages": [AIMessage(content="任务链工作流完成")]
  635. }
  636. async def _error_handler_chain_node(self, state: TaskChainState) -> TaskChainState:
  637. """
  638. 任务链错误处理节点
  639. Args:
  640. state: 任务链状态
  641. Returns:
  642. TaskChainState: 更新后的状态,标记为失败
  643. """
  644. logger.error(f"任务链工作流错误: {state['callback_task_id']}, 错误: {state.get('error_message', '未知错误')}")
  645. # 通知失败
  646. if state["progress_manager"]:
  647. error_data = {
  648. "overall_task_status": "failed",
  649. "error": state.get("error_message", "未知错误"),
  650. "status": "failed",
  651. "timestamp": datetime.now().isoformat()
  652. }
  653. await state["progress_manager"].complete_task(
  654. state["callback_task_id"],
  655. state["user_id"],
  656. error_data
  657. )
  658. # 清理 Redis 缓存(即使失败也清理)
  659. try:
  660. from foundation.utils.redis_utils import delete_file_info
  661. await delete_file_info(state["file_id"])
  662. logger.info(f"已清理 Redis 文件缓存: {state['file_id']}")
  663. except Exception as e:
  664. logger.warning(f"清理 Redis 文件缓存失败: {str(e)}")
  665. return {
  666. "current_stage": "error_handler",
  667. "overall_task_status": "failed",
  668. "messages": [AIMessage(content=f"任务链错误: {state.get('error_message', '未知错误')}")]
  669. }
  670. async def _terminate_chain_node(self, state: TaskChainState) -> TaskChainState:
  671. """
  672. 任务链终止节点
  673. Args:
  674. state: 任务链状态
  675. Returns:
  676. TaskChainState: 更新后的状态,标记为已终止
  677. """
  678. logger.warning(f"任务链工作流已终止: {state['callback_task_id']}")
  679. # 通知终止
  680. if state["progress_manager"]:
  681. await state["progress_manager"].complete_task(
  682. state["callback_task_id"],
  683. state["user_id"],
  684. {"overall_task_status": "terminated", "message": "任务已被用户终止"}
  685. )
  686. # 清理 Redis 终止信号
  687. await self.clear_terminate_signal(state["callback_task_id"])
  688. # 清理 Redis 文件缓存
  689. try:
  690. from foundation.utils.redis_utils import delete_file_info
  691. await delete_file_info(state["file_id"])
  692. logger.info(f"已清理 Redis 文件缓存: {state['file_id']}")
  693. except Exception as e:
  694. logger.warning(f"清理 Redis 文件缓存失败: {str(e)}")
  695. return {
  696. "current_stage": "terminated",
  697. "overall_task_status": "terminated",
  698. "messages": [AIMessage(content="任务链已被终止")]
  699. }
  700. def _should_terminate_or_error_chain(self, state: TaskChainState) -> str:
  701. """
  702. 检查任务链是否应该终止或发生错误
  703. Args:
  704. state: 任务链状态
  705. Returns:
  706. str: "terminate", "error", 或 "continue"
  707. Note:
  708. 这是条件边判断方法,用于决定工作流的下一步走向
  709. 1. 优先检查终止信号
  710. 2. 检查是否有错误
  711. 3. 都没有则继续执行
  712. """
  713. # 检查终止状态
  714. if state.get("overall_task_status") == "terminated":
  715. return "terminate"
  716. # 检查错误状态
  717. if state.get("overall_task_status") == "failed" or state.get("error_message"):
  718. return "error"
  719. # 默认继续执行
  720. return "continue"
  721. async def _save_complete_results(self, state: TaskChainState, report_result: Dict[str, Any]):
  722. """
  723. 保存完整结果(方案A-方式1:一次性保存)
  724. Args:
  725. state: 任务链状态
  726. report_result: 报告生成结果
  727. Note:
  728. 根据决策2(方案A-方式1),在报告工作流完成后一次性保存完整结果
  729. 包含:文档处理结果 + AI审查结果 + 报告生成结果
  730. """
  731. try:
  732. import json
  733. import os
  734. logger.info(f"开始保存完整结果: {state['callback_task_id']}")
  735. # 创建 temp 目录
  736. temp_dir = os.path.join("temp", "construction_review", "final_result")
  737. os.makedirs(temp_dir, exist_ok=True)
  738. # 构建完整结果
  739. complete_results = {
  740. "callback_task_id": state["callback_task_id"],
  741. "file_id": state["file_id"],
  742. "file_name": state["file_name"],
  743. "user_id": state["user_id"],
  744. "overall_task_status": "processing", # 此时还在处理中,complete节点才标记completed
  745. "stage_status": state["stage_status"],
  746. "document_result": state.get("document_result"),
  747. "ai_review_result": state.get("ai_review_result"),
  748. "issues": state.get("ai_review_result").get("review_results"),
  749. "report_result": report_result,
  750. "timestamp": datetime.now().isoformat()
  751. }
  752. # 保存到文件
  753. file_path = os.path.join(temp_dir, f"{state['callback_task_id']}.json")
  754. with open(file_path, 'w', encoding='utf-8') as f:
  755. json.dump(complete_results, f, ensure_ascii=False, indent=2)
  756. logger.info(f"完整结果已保存到: {file_path}")
  757. except Exception as e:
  758. logger.error(f"保存完整结果失败: {str(e)}", exc_info=True)
  759. raise
  760. # ==================== 施工方案编写任务管理方法 ====================
  761. async def submit_outline_generation_task(self, task_info: dict) -> str:
  762. """
  763. 提交大纲生成任务到 Celery
  764. Args:
  765. task_info: 任务信息字典
  766. {
  767. "user_id": str,
  768. "project_info": dict,
  769. "template_id": str,
  770. "outline_config": dict,
  771. "similarity_config": dict (可选),
  772. "knowledge_config": dict (可选)
  773. }
  774. Returns:
  775. str: Celery 任务 ID
  776. """
  777. from foundation.infrastructure.messaging.tasks import submit_outline_generation_task
  778. from foundation.infrastructure.tracing.celery_trace import CeleryTraceManager
  779. try:
  780. logger.info(f"提交大纲生成任务到Celery: user_id={task_info.get('user_id')}")
  781. # 使用 CeleryTraceManager 提交任务,自动传递 trace_id
  782. task = CeleryTraceManager.submit_celery_task(
  783. submit_outline_generation_task,
  784. task_info
  785. )
  786. logger.info(f"大纲生成Celery任务已提交,Task ID: {task.id}")
  787. return task.id
  788. except Exception as e:
  789. logger.error(f"提交大纲生成Celery任务失败: {str(e)}")
  790. raise
  791. @track_execution_time
  792. def submit_outline_generation_sync(self, task_info: dict) -> dict:
  793. """
  794. 同步执行大纲生成任务(用于 Celery worker)
  795. Args:
  796. task_info: 任务信息字典
  797. Returns:
  798. dict: 执行结果
  799. """
  800. import uuid
  801. from langchain_core.messages import HumanMessage
  802. from ..construction_write.component.state_models import OutlineGenerationState, OutlineTaskInfo
  803. from ..construction_write.workflows.outline_workflow import OutlineWorkflow
  804. callback_task_id = None
  805. try:
  806. logger.info(f"开始执行大纲生成任务(LangGraph)")
  807. # 1. 生成任务 ID(如果没有提供)
  808. callback_task_id = task_info.get('callback_task_id') or f"outline_{uuid.uuid4().hex[:16]}"
  809. user_id = task_info.get('user_id', 'unknown')
  810. # 2. 创建任务信息对象
  811. outline_task_info = OutlineTaskInfo(
  812. callback_task_id=callback_task_id,
  813. user_id=user_id,
  814. project_info=task_info.get('project_info', {}),
  815. template_id=task_info.get('template_id', ''),
  816. outline_config=task_info.get('outline_config', {}),
  817. similarity_config=task_info.get('similarity_config', {}),
  818. knowledge_config=task_info.get('knowledge_config', {})
  819. )
  820. # 3. 添加到活跃任务跟踪
  821. self.active_outline_tasks[callback_task_id] = outline_task_info
  822. # 4. 初始化进度管理
  823. loop = asyncio.new_event_loop()
  824. asyncio.set_event_loop(loop)
  825. loop.run_until_complete(self.progress_manager.initialize_progress(
  826. callback_task_id=callback_task_id,
  827. user_id=user_id,
  828. stages=[
  829. {"stage": "start", "status": "pending"},
  830. {"stage": "template_loading", "status": "pending"},
  831. {"stage": "outline_generation", "status": "pending"},
  832. {"stage": "similar_cases", "status": "pending"},
  833. {"stage": "similar_fragments", "status": "pending"},
  834. {"stage": "knowledge_bases", "status": "pending"},
  835. {"stage": "complete", "status": "pending"}
  836. ]
  837. ))
  838. # 4.1 注册 ProgressManager 到 Registry(供节点访问)
  839. ProgressManagerRegistry.register_progress_manager(callback_task_id, self.progress_manager)
  840. # 4.2 标记任务开始
  841. outline_task_info.start_processing()
  842. # 5. 构建 LangGraph 大纲生成工作流(延迟初始化)
  843. if self.outline_generation_graph is None:
  844. outline_workflow = OutlineWorkflow()
  845. self.outline_generation_graph = outline_workflow.build_graph()
  846. # 6. 构建初始状态
  847. # 注意:progress_manager 和 task_info 不能放入状态(不可序列化)
  848. # 它们通过类实例变量访问
  849. initial_state = OutlineGenerationState(
  850. callback_task_id=callback_task_id,
  851. user_id=user_id,
  852. project_info=outline_task_info.project_info,
  853. template_id=outline_task_info.template_id,
  854. outline_config=outline_task_info.outline_config,
  855. similarity_config=outline_task_info.similarity_config,
  856. knowledge_config=outline_task_info.knowledge_config,
  857. template=None,
  858. outline_structure=None,
  859. key_points=None,
  860. similar_cases=None,
  861. similar_fragments=None,
  862. knowledge_bases=None,
  863. current_stage="start",
  864. overall_task_status="processing",
  865. error_message=None,
  866. messages=[HumanMessage(content=f"开始大纲生成任务: {callback_task_id}")]
  867. )
  868. # 7. 执行 LangGraph 工作流
  869. # 需要提供 config 参数给 Checkpointer
  870. result = loop.run_until_complete(
  871. self.outline_generation_graph.ainvoke(
  872. initial_state,
  873. config={"configurable": {"thread_id": callback_task_id}}
  874. )
  875. )
  876. loop.close()
  877. logger.info(f"大纲生成任务完成!callback_task_id={callback_task_id}")
  878. # 8. 更新任务状态
  879. if result.get("overall_task_status") == "completed":
  880. outline_task_info.complete_processing({
  881. "outline_structure": result.get("outline_structure"),
  882. "key_points": result.get("key_points"),
  883. "similar_cases": result.get("similar_cases"),
  884. "similar_fragments": result.get("similar_fragments"),
  885. "knowledge_bases": result.get("knowledge_bases")
  886. })
  887. elif result.get("overall_task_status") == "failed":
  888. outline_task_info.fail_processing(result.get("error_message", "未知错误"))
  889. elif result.get("overall_task_status") == "terminated":
  890. outline_task_info.cancel_processing()
  891. # 8.5 将任务结果保存到 Redis(供跨进程访问)
  892. async def save_result_to_redis():
  893. redis_client = await RedisConnectionFactory.get_connection()
  894. result_key = f"{self._outline_result_prefix}{callback_task_id}"
  895. # 构建结果数据(过滤 None 值,Redis 不支持)
  896. result_data = {
  897. "callback_task_id": callback_task_id,
  898. "user_id": user_id,
  899. "overall_task_status": result.get("overall_task_status", ""),
  900. "outline_structure": json.dumps(result.get("outline_structure"), ensure_ascii=False) if result.get("outline_structure") else "",
  901. "key_points": json.dumps(result.get("key_points"), ensure_ascii=False) if result.get("key_points") else "",
  902. "similar_cases": json.dumps(result.get("similar_cases"), ensure_ascii=False) if result.get("similar_cases") else "",
  903. "similar_fragments": json.dumps(result.get("similar_fragments"), ensure_ascii=False) if result.get("similar_fragments") else "",
  904. "knowledge_bases": json.dumps(result.get("knowledge_bases"), ensure_ascii=False) if result.get("knowledge_bases") else "",
  905. "error_message": result.get("error_message") or "",
  906. "completed_time": str(time.time())
  907. }
  908. # 保存到 Redis(设置过期时间2小时)
  909. await redis_client.hmset(result_key, result_data)
  910. await redis_client.expire(result_key, self._task_expire_time)
  911. logger.info(f"大纲生成结果已保存到 Redis: {callback_task_id}")
  912. # 在同步函数中运行异步代码
  913. loop = asyncio.new_event_loop()
  914. asyncio.set_event_loop(loop)
  915. try:
  916. loop.run_until_complete(save_result_to_redis())
  917. finally:
  918. loop.close()
  919. # 9. 返回可序列化结果
  920. return {
  921. "callback_task_id": result.get("callback_task_id"),
  922. "user_id": result.get("user_id"),
  923. "overall_task_status": result.get("overall_task_status"),
  924. "outline_structure": result.get("outline_structure"),
  925. "key_points": result.get("key_points"),
  926. "similar_cases": result.get("similar_cases"),
  927. "similar_fragments": result.get("similar_fragments"),
  928. "knowledge_bases": result.get("knowledge_bases"),
  929. "error_message": result.get("error_message")
  930. }
  931. except Exception as e:
  932. logger.error(f"大纲生成任务失败: {str(e)}", exc_info=True)
  933. # 标记任务失败
  934. if callback_task_id and callback_task_id in self.active_outline_tasks:
  935. self.active_outline_tasks[callback_task_id].fail_processing(str(e))
  936. raise
  937. finally:
  938. # 清理活跃任务
  939. if callback_task_id and callback_task_id in self.active_outline_tasks:
  940. del self.active_outline_tasks[callback_task_id]
  941. # 清理 Registry
  942. ProgressManagerRegistry.unregister_progress_manager(callback_task_id)
  943. async def set_outline_terminate_signal(self, callback_task_id: str, operator: str = "unknown") -> Dict[str, any]:
  944. """
  945. 设置大纲生成任务终止信号
  946. Args:
  947. callback_task_id: 任务回调ID
  948. operator: 操作人
  949. Returns:
  950. Dict: 操作结果
  951. """
  952. try:
  953. # 检查任务是否在活跃列表中
  954. if callback_task_id not in self.active_outline_tasks:
  955. return {
  956. "success": False,
  957. "message": f"任务不存在或已完成: {callback_task_id}",
  958. "task_info": None
  959. }
  960. task_info = self.active_outline_tasks[callback_task_id]
  961. # 检查任务状态
  962. if task_info.status != "processing":
  963. return {
  964. "success": False,
  965. "message": f"任务状态不是 processing,无需终止: {callback_task_id} (当前状态: {task_info.status})",
  966. "task_info": {
  967. "callback_task_id": callback_task_id,
  968. "status": task_info.status,
  969. "project_name": task_info.project_name
  970. }
  971. }
  972. # 设置 Redis 终止信号
  973. redis_client = await RedisConnectionFactory.get_connection()
  974. terminate_key = f"{self._outline_terminate_signal_prefix}{callback_task_id}"
  975. # 存储终止信号和操作人、时间
  976. terminate_data = {
  977. "operator": operator,
  978. "terminate_time": str(time.time()),
  979. "task_id": callback_task_id
  980. }
  981. # 使用 hash 存储更多信息
  982. await redis_client.hset(terminate_key, mapping=terminate_data)
  983. # 设置过期时间(2小时)
  984. await redis_client.expire(terminate_key, self._task_expire_time)
  985. logger.info(f"已设置大纲任务终止信号: {callback_task_id} (操作人: {operator}, 项目: {task_info.project_name})")
  986. return {
  987. "success": True,
  988. "message": f"终止信号已设置,任务将在当前节点完成后终止",
  989. "task_info": {
  990. "callback_task_id": callback_task_id,
  991. "user_id": task_info.user_id,
  992. "project_name": task_info.project_name,
  993. "status": task_info.status
  994. }
  995. }
  996. except Exception as e:
  997. logger.error(f"设置大纲任务终止信号失败: {str(e)}", exc_info=True)
  998. return {
  999. "success": False,
  1000. "message": f"设置终止信号失败: {str(e)}",
  1001. "task_info": None
  1002. }
  1003. async def check_outline_terminate_signal(self, callback_task_id: str) -> bool:
  1004. """
  1005. 检查大纲生成任务是否有终止信号
  1006. Args:
  1007. callback_task_id: 任务回调ID
  1008. Returns:
  1009. bool: 有终止信号返回 True
  1010. """
  1011. try:
  1012. redis_client = await RedisConnectionFactory.get_connection()
  1013. terminate_key = f"{self._outline_terminate_signal_prefix}{callback_task_id}"
  1014. # 检查键是否存在
  1015. exists = await redis_client.exists(terminate_key)
  1016. if exists:
  1017. # 读取终止信息
  1018. terminate_info = await redis_client.hgetall(terminate_key)
  1019. logger.warning(f"检测到大纲任务终止信号: {callback_task_id}, "
  1020. f"操作人: {terminate_info.get(b'operator', b'unknown').decode()}")
  1021. return True
  1022. return False
  1023. except Exception as e:
  1024. logger.error(f"检查大纲任务终止信号失败: {str(e)}", exc_info=True)
  1025. return False
  1026. async def clear_outline_terminate_signal(self, callback_task_id: str):
  1027. """
  1028. 清理 Redis 中的大纲任务终止信号
  1029. Args:
  1030. callback_task_id: 任务回调ID
  1031. """
  1032. try:
  1033. redis_client = await RedisConnectionFactory.get_connection()
  1034. terminate_key = f"{self._outline_terminate_signal_prefix}{callback_task_id}"
  1035. await redis_client.delete(terminate_key)
  1036. logger.debug(f"清理大纲任务终止信号: {callback_task_id}")
  1037. except Exception as e:
  1038. logger.warning(f"清理大纲任务终止信号失败: {str(e)}")
  1039. async def get_outline_active_tasks(self) -> list:
  1040. """
  1041. 获取活跃的大纲生成任务列表
  1042. Returns:
  1043. list: 活跃任务信息列表
  1044. """
  1045. try:
  1046. active_tasks = []
  1047. current_time = time.time()
  1048. for task_id, task_info in self.active_outline_tasks.items():
  1049. if task_info.status == "processing":
  1050. task_dict = {
  1051. "callback_task_id": task_id,
  1052. "user_id": task_info.user_id,
  1053. "project_name": task_info.project_name,
  1054. "project_type": task_info.project_type,
  1055. "status": task_info.status,
  1056. "start_time": task_info.start_time,
  1057. "running_duration": int(current_time - task_info.start_time) if task_info.start_time else 0
  1058. }
  1059. active_tasks.append(task_dict)
  1060. return active_tasks
  1061. except Exception as e:
  1062. logger.error(f"获取活跃大纲任务列表失败: {str(e)}", exc_info=True)
  1063. return []
  1064. async def get_outline_task_info(self, callback_task_id: str) -> Optional[Dict]:
  1065. """
  1066. 获取大纲生成任务信息
  1067. Args:
  1068. callback_task_id: 任务回调ID
  1069. Returns:
  1070. Optional[Dict]: 任务信息字典,不存在返回 None
  1071. """
  1072. try:
  1073. # 优先从内存中的活跃任务获取
  1074. task_info = self.active_outline_tasks.get(callback_task_id)
  1075. if task_info:
  1076. current_time = time.time()
  1077. return {
  1078. "callback_task_id": callback_task_id,
  1079. "user_id": task_info.user_id,
  1080. "project_name": task_info.project_name,
  1081. "project_type": task_info.project_type,
  1082. "status": task_info.status,
  1083. "start_time": task_info.start_time,
  1084. "running_duration": int(current_time - task_info.start_time) if task_info.start_time else 0,
  1085. "results": task_info.results
  1086. }
  1087. # 如果内存中没有,从 Redis 读取(用于跨进程访问 Celery worker 的结果)
  1088. redis_client = await RedisConnectionFactory.get_connection()
  1089. result_key = f"{self._outline_result_prefix}{callback_task_id}"
  1090. result_data = await redis_client.hgetall(result_key)
  1091. if result_data:
  1092. # 解析 JSON 字符串
  1093. parsed_results = {}
  1094. for key in ["outline_structure", "key_points", "similar_cases", "similar_fragments", "knowledge_bases"]:
  1095. value = result_data.get(key)
  1096. if value and value != "":
  1097. try:
  1098. parsed_results[key] = json.loads(value)
  1099. except (json.JSONDecodeError, TypeError):
  1100. parsed_results[key] = None
  1101. else:
  1102. parsed_results[key] = None
  1103. # 映射状态
  1104. overall_status = result_data.get("overall_task_status", "unknown")
  1105. status_mapping = {
  1106. "completed": "completed",
  1107. "failed": "failed",
  1108. "terminated": "cancelled"
  1109. }
  1110. status = status_mapping.get(overall_status, overall_status)
  1111. return {
  1112. "callback_task_id": result_data.get("callback_task_id"),
  1113. "user_id": result_data.get("user_id"),
  1114. "project_name": result_data.get("project_name", ""),
  1115. "project_type": result_data.get("project_type", ""),
  1116. "status": status,
  1117. "start_time": None,
  1118. "running_duration": 0,
  1119. "results": {
  1120. "outline_structure": parsed_results.get("outline_structure"),
  1121. "key_points": parsed_results.get("key_points"),
  1122. "similar_cases": parsed_results.get("similar_cases"),
  1123. "similar_fragments": parsed_results.get("similar_fragments"),
  1124. "knowledge_bases": parsed_results.get("knowledge_bases"),
  1125. "error": result_data.get("error_message") or None
  1126. }
  1127. }
  1128. return None
  1129. except Exception as e:
  1130. logger.error(f"获取大纲任务信息失败: {str(e)}", exc_info=True)
  1131. return None