workflow_manager.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. import asyncio
  2. import json
  3. import time
  4. import uuid
  5. from typing import Any, Dict, Optional
  6. from langchain_core.messages import HumanMessage
  7. from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
  8. from foundation.infrastructure.tracing import TraceContext
  9. from foundation.observability.logger.loggering import write_logger as logger
  10. from foundation.observability.monitoring.time_statistics import track_execution_time
  11. from core.base.progress_manager import ProgressManager
  12. class ProgressManagerRegistry:
  13. """进度管理器注册表,用于在 LangGraph 工作流节点间共享 ProgressManager 实例。"""
  14. _registry: Dict[str, ProgressManager] = {}
  15. @classmethod
  16. def register_progress_manager(cls, callback_task_id: str, progress_manager: ProgressManager):
  17. """注册任务的进度管理器。
  18. Args:
  19. callback_task_id: 任务回调 ID
  20. progress_manager: ProgressManager 实例
  21. """
  22. cls._registry[callback_task_id] = progress_manager
  23. logger.info(f"注册 ProgressManager: {callback_task_id}")
  24. @classmethod
  25. def get_progress_manager(cls, callback_task_id: str) -> Optional[ProgressManager]:
  26. """获取指定任务的进度管理器。
  27. Args:
  28. callback_task_id: 任务回调 ID
  29. Returns:
  30. ProgressManager 实例,未找到则返回 None
  31. """
  32. return cls._registry.get(callback_task_id)
  33. @classmethod
  34. def unregister_progress_manager(cls, callback_task_id: Optional[str]):
  35. """注销指定任务的进度管理器。
  36. Args:
  37. callback_task_id: 任务回调 ID
  38. """
  39. if callback_task_id and callback_task_id in cls._registry:
  40. del cls._registry[callback_task_id]
  41. logger.info(f"注销 ProgressManager: {callback_task_id}")
  42. class WorkflowManager:
  43. """施工方案编写任务的工作流管理器。
  44. 负责:
  45. 1. 大纲生成任务的 Celery 提交与同步执行
  46. 2. LangGraph 工作流图的编译与调度
  47. 3. 任务终止信号管理(Redis 存储)
  48. 4. 任务结果的 Redis 持久化
  49. 5. 活跃任务状态查询
  50. """
  51. def __init__(self):
  52. self.progress_manager = ProgressManager()
  53. self.active_outline_tasks: Dict[str, Any] = {}
  54. self._outline_result_prefix = "outline_write:result:"
  55. self._outline_terminate_signal_prefix = "outline_write:terminate_signal:"
  56. self._task_expire_time = 7200
  57. self.outline_generation_graph = None
  58. def _run_async(self, coro):
  59. """在同步上下文中运行异步协程。
  60. Args:
  61. coro: 待执行的协程对象
  62. Returns:
  63. 协程的执行结果
  64. """
  65. loop = asyncio.new_event_loop()
  66. try:
  67. asyncio.set_event_loop(loop)
  68. return loop.run_until_complete(coro)
  69. finally:
  70. try:
  71. loop.close()
  72. finally:
  73. asyncio.set_event_loop(None)
  74. def _outline_progress_stages(self) -> list:
  75. """返回大纲生成任务的进度阶段定义。
  76. Returns:
  77. 阶段列表,每个阶段包含 stage 名称和 status
  78. """
  79. return [
  80. {"stage": "start", "status": "pending"},
  81. {"stage": "template_loading", "status": "pending"},
  82. {"stage": "outline_generation", "status": "pending"},
  83. {"stage": "similar_cases", "status": "pending"},
  84. {"stage": "similar_fragments", "status": "pending"},
  85. {"stage": "knowledge_bases", "status": "pending"},
  86. {"stage": "complete", "status": "pending"},
  87. ]
  88. async def submit_outline_generation_task(self, sgbx_task_info: dict) -> str:
  89. """提交大纲生成任务到 Celery 队列。
  90. 流程:预注册任务 → 初始化进度 → 提交 Celery → 更新状态
  91. Args:
  92. sgbx_task_info: 施工方案任务信息字典
  93. Returns:
  94. Celery 任务 ID
  95. Raises:
  96. Exception: Celery 任务提交失败时抛出
  97. """
  98. from foundation.infrastructure.messaging.tasks import submit_outline_generation_task
  99. callback_task_id = sgbx_task_info.get("callback_task_id")
  100. user_id = sgbx_task_info.get("user_id", "unknown")
  101. logger.info(
  102. f"Submit outline generation task: callback_task_id={callback_task_id}, user_id={user_id}"
  103. )
  104. await self._pre_register_outline_task(sgbx_task_info)
  105. await self.progress_manager.initialize_progress(
  106. callback_task_id=callback_task_id,
  107. user_id=user_id,
  108. stages=self._outline_progress_stages(),
  109. )
  110. await self.progress_manager.update_stage_progress(
  111. callback_task_id=callback_task_id,
  112. user_id=user_id,
  113. current=5,
  114. stage_name="任务提交中",
  115. status="processing",
  116. message="正在提交大纲生成任务...",
  117. overall_task_status="processing",
  118. )
  119. kwargs = {}
  120. trace_id = TraceContext.get_trace_id()
  121. if trace_id and trace_id != "no-trace":
  122. kwargs["_system_trace_id"] = trace_id
  123. try:
  124. task = submit_outline_generation_task.apply_async(
  125. args=[sgbx_task_info],
  126. kwargs=kwargs,
  127. queue="construction_write",
  128. )
  129. except Exception as exc:
  130. await self.progress_manager.update_stage_progress(
  131. callback_task_id=callback_task_id,
  132. user_id=user_id,
  133. current=5,
  134. stage_name="任务提交失败",
  135. status="failed",
  136. message=f"提交大纲生成任务失败: {exc}",
  137. overall_task_status="failed",
  138. event_type="failed",
  139. )
  140. raise
  141. await self._update_outline_result_status(
  142. callback_task_id,
  143. overall_task_status="processing",
  144. celery_task_id=task.id,
  145. submitted_time=str(time.time()),
  146. )
  147. await self.progress_manager.update_stage_progress(
  148. callback_task_id=callback_task_id,
  149. user_id=user_id,
  150. current=10,
  151. stage_name="任务已提交",
  152. status="submitted",
  153. message="大纲生成任务已提交,正在执行...",
  154. overall_task_status="processing",
  155. )
  156. logger.info(f"Outline generation Celery task submitted: {task.id}")
  157. return task.id
  158. @track_execution_time
  159. def submit_outline_generation_sync(self, sgbx_task_info: dict) -> dict:
  160. """同步执行大纲生成任务(Celery worker 内部调用)。
  161. Args:
  162. sgbx_task_info: 施工方案任务信息字典
  163. Returns:
  164. 包含大纲结构、关键要点、相似推荐等结果的字典
  165. """
  166. callback_task_id = sgbx_task_info.get("callback_task_id") or f"outline_{uuid.uuid4().hex[:16]}"
  167. return self._run_async(self._execute_outline_generation(sgbx_task_info, callback_task_id))
  168. async def _execute_outline_generation(self, sgbx_task_info: dict, callback_task_id: str) -> dict:
  169. """执行大纲生成工作流的核心方法。
  170. 完整流程:检查终止信号 → 构建初始状态 → 编译/加载 LangGraph 图 →
  171. 执行工作流 → 保存结果到 Redis
  172. Args:
  173. sgbx_task_info: 施工方案任务信息字典
  174. callback_task_id: 任务回调 ID
  175. Returns:
  176. 包含大纲结构、关键要点、相似推荐等结果的字典
  177. """
  178. from core.construction_write.component.state_models import OutlineGenerationState, OutlineTaskInfo
  179. from core.construction_write.workflows.outline_workflow import OutlineWorkflow
  180. user_id = sgbx_task_info.get("user_id", "unknown")
  181. outline_sgbx_task_info: Optional[OutlineTaskInfo] = None
  182. try:
  183. logger.info(f"Start outline generation workflow: {callback_task_id}")
  184. if await self.check_outline_terminate_signal(callback_task_id):
  185. return self._terminated_result(callback_task_id, user_id, "Task was cancelled before start")
  186. outline_sgbx_task_info = OutlineTaskInfo(
  187. callback_task_id=callback_task_id,
  188. user_id=user_id,
  189. project_info=sgbx_task_info.get("project_info", {}),
  190. template_id=sgbx_task_info.get("template_id", ""),
  191. generation_chapterenum=sgbx_task_info.get("generation_chapterenum", []),
  192. generation_template=sgbx_task_info.get("generation_template", []),
  193. similarity_config=sgbx_task_info.get(
  194. "similarity_config",
  195. {"topk_plans": 3, "topk_fragments": 10, "threshold": 0.75},
  196. ),
  197. knowledge_config=sgbx_task_info.get(
  198. "knowledge_config",
  199. {"topk": 3, "threshold": 0.75},
  200. ),
  201. )
  202. self.active_outline_tasks[callback_task_id] = outline_sgbx_task_info
  203. await self._update_outline_result_status(
  204. callback_task_id,
  205. overall_task_status="processing",
  206. worker_started_at=str(time.time()),
  207. )
  208. await self.progress_manager.initialize_progress(
  209. callback_task_id=callback_task_id,
  210. user_id=user_id,
  211. stages=self._outline_progress_stages(),
  212. )
  213. ProgressManagerRegistry.register_progress_manager(callback_task_id, self.progress_manager)
  214. outline_sgbx_task_info.start_processing()
  215. if self.outline_generation_graph is None:
  216. self.outline_generation_graph = OutlineWorkflow().build_graph()
  217. initial_state = OutlineGenerationState(
  218. callback_task_id=callback_task_id,
  219. user_id=user_id,
  220. project_info=outline_sgbx_task_info.project_info,
  221. template_id=outline_sgbx_task_info.template_id,
  222. generation_chapterenum=outline_sgbx_task_info.generation_chapterenum,
  223. generation_template=outline_sgbx_task_info.generation_template,
  224. similarity_config=outline_sgbx_task_info.similarity_config,
  225. knowledge_config=outline_sgbx_task_info.knowledge_config,
  226. template=None,
  227. outline_structure=None,
  228. key_points=None,
  229. similar_cases=None,
  230. similar_fragments=None,
  231. knowledge_bases=None,
  232. current_stage="start",
  233. overall_task_status="processing",
  234. error_message=None,
  235. messages=[HumanMessage(content=f"Start outline generation task {callback_task_id}")],
  236. )
  237. result = await self.outline_generation_graph.ainvoke(
  238. initial_state,
  239. config={"configurable": {"thread_id": callback_task_id}},
  240. )
  241. status = result.get("overall_task_status")
  242. if status == "completed":
  243. outline_sgbx_task_info.complete_processing(
  244. {
  245. "outline_structure": result.get("outline_structure"),
  246. "key_points": result.get("key_points"),
  247. "similar_cases": result.get("similar_cases"),
  248. "similar_fragments": result.get("similar_fragments"),
  249. "knowledge_bases": result.get("knowledge_bases"),
  250. }
  251. )
  252. elif status == "failed":
  253. outline_sgbx_task_info.fail_processing(result.get("error_message", "unknown error"))
  254. elif status == "terminated":
  255. outline_sgbx_task_info.cancel_processing()
  256. await self._save_result_to_redis(callback_task_id, user_id, result)
  257. return {
  258. "callback_task_id": result.get("callback_task_id"),
  259. "user_id": result.get("user_id"),
  260. "overall_task_status": result.get("overall_task_status"),
  261. "outline_structure": result.get("outline_structure"),
  262. "key_points": result.get("key_points"),
  263. "similar_cases": result.get("similar_cases"),
  264. "similar_fragments": result.get("similar_fragments"),
  265. "knowledge_bases": result.get("knowledge_bases"),
  266. "error_message": result.get("error_message"),
  267. }
  268. except Exception as exc:
  269. logger.error(f"Outline generation task failed: {exc}", exc_info=True)
  270. if outline_sgbx_task_info:
  271. outline_sgbx_task_info.fail_processing(str(exc))
  272. error_message = str(exc)
  273. failed_result = {
  274. "callback_task_id": callback_task_id,
  275. "user_id": user_id,
  276. "overall_task_status": "failed",
  277. "outline_structure": None,
  278. "key_points": None,
  279. "similar_cases": None,
  280. "similar_fragments": None,
  281. "knowledge_bases": None,
  282. "error_message": error_message,
  283. }
  284. try:
  285. await self.progress_manager.update_stage_progress(
  286. callback_task_id=callback_task_id,
  287. user_id=user_id,
  288. stage_name="任务失败",
  289. status="failed",
  290. message=f"大纲生成任务失败: {error_message}",
  291. overall_task_status="failed",
  292. event_type="failed",
  293. )
  294. await self._save_result_to_redis(callback_task_id, user_id, failed_result)
  295. except Exception as update_exc:
  296. logger.warning(
  297. f"Failed to persist outline failure state: {callback_task_id}, {update_exc}",
  298. exc_info=True,
  299. )
  300. raise
  301. finally:
  302. self.active_outline_tasks.pop(callback_task_id, None)
  303. ProgressManagerRegistry.unregister_progress_manager(callback_task_id)
  304. def _terminated_result(self, callback_task_id: str, user_id: str, message: str) -> dict:
  305. """构建任务终止时的返回结果。
  306. Args:
  307. callback_task_id: 任务回调 ID
  308. user_id: 用户 ID
  309. message: 终止原因
  310. Returns:
  311. 终止状态字典
  312. """
  313. return {
  314. "callback_task_id": callback_task_id,
  315. "user_id": user_id,
  316. "overall_task_status": "terminated",
  317. "outline_structure": None,
  318. "key_points": None,
  319. "similar_cases": None,
  320. "similar_fragments": None,
  321. "knowledge_bases": None,
  322. "error_message": message,
  323. }
  324. async def _save_result_to_redis(self, callback_task_id: str, user_id: str, result: dict):
  325. """将大纲生成结果保存到 Redis。
  326. Args:
  327. callback_task_id: 任务回调 ID
  328. user_id: 用户 ID
  329. result: 工作流执行结果字典
  330. """
  331. redis_client = await RedisConnectionFactory.get_connection()
  332. result_key = f"{self._outline_result_prefix}{callback_task_id}"
  333. result_data = {
  334. "callback_task_id": callback_task_id,
  335. "user_id": user_id,
  336. "overall_task_status": result.get("overall_task_status", ""),
  337. "outline_structure": json.dumps(result.get("outline_structure"), ensure_ascii=False)
  338. if result.get("outline_structure")
  339. else "",
  340. "key_points": json.dumps(result.get("key_points"), ensure_ascii=False)
  341. if result.get("key_points")
  342. else "",
  343. "similar_cases": json.dumps(result.get("similar_cases"), ensure_ascii=False)
  344. if result.get("similar_cases")
  345. else "",
  346. "similar_fragments": json.dumps(result.get("similar_fragments"), ensure_ascii=False)
  347. if result.get("similar_fragments")
  348. else "",
  349. "knowledge_bases": json.dumps(result.get("knowledge_bases"), ensure_ascii=False)
  350. if result.get("knowledge_bases")
  351. else "",
  352. "error_message": result.get("error_message") or "",
  353. "completed_time": str(time.time()),
  354. }
  355. await redis_client.hmset(result_key, result_data)
  356. await redis_client.expire(result_key, self._task_expire_time)
  357. logger.info(f"Outline generation result saved to Redis: {callback_task_id}")
  358. async def _update_outline_result_status(self, callback_task_id: str, **fields):
  359. """更新 Redis 中任务状态的字段。
  360. Args:
  361. callback_task_id: 任务回调 ID
  362. **fields: 需要更新的字段键值对
  363. """
  364. if not callback_task_id:
  365. return
  366. try:
  367. redis_client = await RedisConnectionFactory.get_connection()
  368. result_key = f"{self._outline_result_prefix}{callback_task_id}"
  369. fields = {key: "" if value is None else str(value) for key, value in fields.items()}
  370. if not fields:
  371. return
  372. await redis_client.hmset(result_key, fields)
  373. await redis_client.expire(result_key, self._task_expire_time)
  374. except Exception as exc:
  375. logger.warning(f"Update outline result status failed: {callback_task_id}, {exc}")
  376. async def _pre_register_outline_task(self, sgbx_task_info: dict):
  377. """预注册任务信息到 Redis(在 Celery 提交前调用,用于进度查询兜底)。
  378. Args:
  379. sgbx_task_info: 施工方案任务信息字典
  380. """
  381. try:
  382. callback_task_id = sgbx_task_info.get("callback_task_id")
  383. user_id = sgbx_task_info.get("user_id", "unknown")
  384. project_info = sgbx_task_info.get("project_info", {})
  385. redis_client = await RedisConnectionFactory.get_connection()
  386. result_key = f"{self._outline_result_prefix}{callback_task_id}"
  387. await redis_client.hmset(
  388. result_key,
  389. {
  390. "callback_task_id": callback_task_id,
  391. "user_id": user_id,
  392. "project_name": project_info.get("project_name", ""),
  393. "project_type": project_info.get("engineering_type", ""),
  394. "overall_task_status": "pending",
  395. "outline_structure": "",
  396. "key_points": "",
  397. "similar_cases": "",
  398. "similar_fragments": "",
  399. "knowledge_bases": "",
  400. "error_message": "",
  401. "celery_task_id": "",
  402. "pre_registered": "true",
  403. "pre_registered_at": str(time.time()),
  404. "completed_time": "",
  405. },
  406. )
  407. await redis_client.expire(result_key, self._task_expire_time)
  408. except Exception as exc:
  409. logger.error(f"Pre-register outline task failed: {exc}", exc_info=True)
  410. async def set_outline_terminate_signal(
  411. self,
  412. callback_task_id: str,
  413. operator: str = "unknown",
  414. reason: str = "",
  415. ) -> Dict[str, Any]:
  416. """设置大纲生成任务的终止信号。
  417. Args:
  418. callback_task_id: 任务回调 ID
  419. operator: 操作人标识
  420. reason: 终止原因
  421. Returns:
  422. 包含 success/message/sgbx_task_info 的结果字典
  423. """
  424. try:
  425. task_status = None
  426. task_user_id = "unknown"
  427. project_name = ""
  428. if callback_task_id in self.active_outline_tasks:
  429. task_info = self.active_outline_tasks[callback_task_id]
  430. task_status = task_info.status
  431. task_user_id = task_info.user_id
  432. project_name = task_info.project_name
  433. else:
  434. redis_client = await RedisConnectionFactory.get_connection()
  435. result_key = f"{self._outline_result_prefix}{callback_task_id}"
  436. result_data = await redis_client.hgetall(result_key)
  437. if not result_data:
  438. return {"success": False, "message": f"Task not found: {callback_task_id}", "sgbx_task_info": None}
  439. task_status = result_data.get("overall_task_status", "unknown")
  440. task_user_id = result_data.get("user_id", "unknown")
  441. project_name = result_data.get("project_name", "")
  442. if task_status not in ["pending", "processing", "submitted"]:
  443. return {
  444. "success": False,
  445. "message": f"Task cannot be cancelled in status {task_status}: {callback_task_id}",
  446. "sgbx_task_info": {
  447. "callback_task_id": callback_task_id,
  448. "status": task_status,
  449. "project_name": project_name,
  450. },
  451. }
  452. redis_client = await RedisConnectionFactory.get_connection()
  453. terminate_key = f"{self._outline_terminate_signal_prefix}{callback_task_id}"
  454. await redis_client.hmset(
  455. terminate_key,
  456. {
  457. "operator": operator,
  458. "reason": reason,
  459. "terminate_time": str(time.time()),
  460. "task_id": callback_task_id,
  461. },
  462. )
  463. await redis_client.expire(terminate_key, self._task_expire_time)
  464. await self._update_outline_result_status(
  465. callback_task_id,
  466. overall_task_status="terminated",
  467. error_message=reason or "Task cancellation requested",
  468. terminated_by=operator,
  469. terminated_time=str(time.time()),
  470. )
  471. if task_status == "pending":
  472. return {
  473. "success": True,
  474. "message": "Task cancelled before start",
  475. "sgbx_task_info": {
  476. "callback_task_id": callback_task_id,
  477. "user_id": task_user_id,
  478. "project_name": project_name,
  479. "status": "cancelled",
  480. },
  481. }
  482. return {
  483. "success": True,
  484. "message": "Terminate signal set",
  485. "sgbx_task_info": {
  486. "callback_task_id": callback_task_id,
  487. "user_id": task_user_id,
  488. "project_name": project_name,
  489. "status": "cancelled",
  490. },
  491. }
  492. except Exception as exc:
  493. logger.error(f"Set terminate signal failed: {exc}", exc_info=True)
  494. return {"success": False, "message": f"Set terminate signal failed: {exc}", "sgbx_task_info": None}
  495. async def check_outline_terminate_signal(self, callback_task_id: str) -> bool:
  496. """检查任务是否存在终止信号。
  497. Args:
  498. callback_task_id: 任务回调 ID
  499. Returns:
  500. True 表示有终止信号,False 表示无
  501. """
  502. try:
  503. redis_client = await RedisConnectionFactory.get_connection()
  504. terminate_key = f"{self._outline_terminate_signal_prefix}{callback_task_id}"
  505. exists = await redis_client.exists(terminate_key)
  506. if exists:
  507. logger.warning(f"Detected outline terminate signal: {callback_task_id}")
  508. return True
  509. return False
  510. except Exception as exc:
  511. logger.error(f"Check terminate signal failed: {exc}", exc_info=True)
  512. return False
  513. async def clear_outline_terminate_signal(self, callback_task_id: str):
  514. """清除任务的终止信号。
  515. Args:
  516. callback_task_id: 任务回调 ID
  517. """
  518. try:
  519. redis_client = await RedisConnectionFactory.get_connection()
  520. terminate_key = f"{self._outline_terminate_signal_prefix}{callback_task_id}"
  521. await redis_client.delete(terminate_key)
  522. except Exception as exc:
  523. logger.warning(f"Clear terminate signal failed: {exc}")
  524. async def get_outline_active_tasks(self) -> list:
  525. """获取当前正在处理中的大纲任务列表。
  526. Returns:
  527. 活跃任务信息列表,包含任务 ID、用户、项目名、运行时长等
  528. """
  529. current_time = time.time()
  530. active_tasks = []
  531. for task_id, task_info in self.active_outline_tasks.items():
  532. if task_info.status == "processing":
  533. active_tasks.append(
  534. {
  535. "callback_task_id": task_id,
  536. "user_id": task_info.user_id,
  537. "project_name": task_info.project_name,
  538. "project_type": task_info.project_type,
  539. "status": task_info.status,
  540. "start_time": task_info.start_time,
  541. "running_duration": int(current_time - task_info.start_time)
  542. if task_info.start_time
  543. else 0,
  544. }
  545. )
  546. return active_tasks
  547. async def get_outline_sgbx_task_info(self, callback_task_id: str) -> Optional[Dict[str, Any]]:
  548. """获取指定大纲任务的详细信息。
  549. 优先从内存中的活跃任务获取,其次从 Redis 中读取持久化结果。
  550. Args:
  551. callback_task_id: 任务回调 ID
  552. Returns:
  553. 任务信息字典,未找到则返回 None
  554. """
  555. task_info = self.active_outline_tasks.get(callback_task_id)
  556. if task_info:
  557. current_time = time.time()
  558. return {
  559. "callback_task_id": callback_task_id,
  560. "user_id": task_info.user_id,
  561. "project_name": task_info.project_name,
  562. "project_type": task_info.project_type,
  563. "status": task_info.status,
  564. "start_time": task_info.start_time,
  565. "running_duration": int(current_time - task_info.start_time) if task_info.start_time else 0,
  566. "results": task_info.results,
  567. }
  568. try:
  569. redis_client = await RedisConnectionFactory.get_connection()
  570. result_key = f"{self._outline_result_prefix}{callback_task_id}"
  571. result_data = await redis_client.hgetall(result_key)
  572. if not result_data:
  573. return None
  574. parsed_results = {}
  575. for key in ["outline_structure", "key_points", "similar_cases", "similar_fragments", "knowledge_bases"]:
  576. value = result_data.get(key)
  577. if value:
  578. try:
  579. parsed_results[key] = json.loads(value)
  580. except (json.JSONDecodeError, TypeError):
  581. parsed_results[key] = None
  582. else:
  583. parsed_results[key] = None
  584. overall_status = result_data.get("overall_task_status", "unknown")
  585. status = {
  586. "completed": "completed",
  587. "failed": "failed",
  588. "terminated": "cancelled",
  589. "pending": "pending",
  590. "processing": "processing",
  591. "submitted": "processing",
  592. }.get(overall_status, overall_status)
  593. response = {
  594. "callback_task_id": result_data.get("callback_task_id"),
  595. "user_id": result_data.get("user_id"),
  596. "project_name": result_data.get("project_name", ""),
  597. "project_type": result_data.get("project_type", ""),
  598. "status": status,
  599. "start_time": None,
  600. "running_duration": 0,
  601. "results": {
  602. "outline_structure": parsed_results.get("outline_structure"),
  603. "key_points": parsed_results.get("key_points"),
  604. "similar_cases": parsed_results.get("similar_cases"),
  605. "similar_fragments": parsed_results.get("similar_fragments"),
  606. "knowledge_bases": parsed_results.get("knowledge_bases"),
  607. "error": result_data.get("error_message") or None,
  608. },
  609. }
  610. if result_data.get("pre_registered") == "true":
  611. response["is_pre_registered"] = True
  612. response["pre_registered_at"] = result_data.get("pre_registered_at")
  613. return response
  614. except Exception as exc:
  615. logger.error(f"Get outline task info failed: {exc}", exc_info=True)
  616. return None
  617. workflow_manager = WorkflowManager()