workflow_manager.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. """
  2. 基于LangGraph的工作流管理器
  3. 负责任务的创建、编排和执行,使用LangGraph进行状态管理
  4. """
  5. import asyncio
  6. import uuid
  7. from typing import Dict, Optional, TypedDict, Annotated, List
  8. from datetime import datetime
  9. from dataclasses import dataclass
  10. from langgraph.graph import StateGraph, END
  11. from langgraph.graph.message import add_messages
  12. from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
  13. from foundation.logger.loggering import server_logger as logger
  14. from foundation.utils.time_statistics import track_execution_time
  15. from .progress_manager import ProgressManager
  16. from .redis_duplicate_checker import RedisDuplicateChecker
  17. from ..construction_review.workflows import DocumentWorkflow,AIReviewWorkflow,ReportWorkflow
  18. @dataclass
  19. class TaskChain:
  20. """任务链"""
  21. callback_task_id: str
  22. file_id: str
  23. user_id: str
  24. status: str # processing, completed, failed
  25. current_stage: str
  26. created_at: datetime
  27. started_at: Optional[datetime] = None
  28. completed_at: Optional[datetime] = None
  29. results: Dict = None
  30. def __post_init__(self):
  31. if self.results is None:
  32. self.results = {}
  33. class WorkflowManager:
  34. """工作流管理器"""
  35. def __init__(self, max_concurrent_docs: int = 5, max_concurrent_reviews: int = 10):
  36. self.max_concurrent_docs = max_concurrent_docs
  37. self.max_concurrent_reviews = max_concurrent_reviews
  38. # 并发控制
  39. self.doc_semaphore = asyncio.Semaphore(max_concurrent_docs)
  40. self.review_semaphore = asyncio.Semaphore(max_concurrent_reviews)
  41. # 服务组件
  42. self.progress_manager = ProgressManager()
  43. self.redis_duplicate_checker = RedisDuplicateChecker()
  44. # 活跃任务跟踪
  45. self.active_chains: Dict[str, TaskChain] = {}
  46. self._cleanup_task_started = False
  47. async def submit_task_processing(self, file_info: dict) -> str:
  48. """异步提交任务处理(用于file_upload层)"""
  49. from foundation.base.tasks import submit_task_processing_task
  50. try:
  51. logger.info(f"提交文档处理任务到Celery: {file_info['file_id']}")
  52. # 提交到Celery队列
  53. task = submit_task_processing_task.delay(file_info)
  54. logger.info(f"Celery任务已提交,Task ID: {task.id}")
  55. return task.id
  56. except Exception as e:
  57. logger.error(f"提交Celery任务失败: {str(e)}")
  58. raise
  59. @track_execution_time
  60. def submit_task_processing_sync(self, file_info: dict) -> dict:
  61. """同步提交任务处理(用于Celery worker)"""
  62. try:
  63. logger.info(f"提交文档处理任务: {file_info['file_id']}")
  64. # 1. 生成任务链ID
  65. callback_task_id = file_info['callback_task_id']
  66. # 2. 创建任务链
  67. task_chain = TaskChain(
  68. callback_task_id=callback_task_id,
  69. file_id=file_info['file_id'],
  70. user_id=file_info['user_id'],
  71. status="processing",
  72. current_stage="document_processing",
  73. created_at=datetime.now()
  74. )
  75. # 4. 注册任务
  76. asyncio.run(self.redis_duplicate_checker.register_task(file_info, callback_task_id))
  77. self.active_chains[callback_task_id] = task_chain
  78. # 5. 初始化进度
  79. asyncio.run(self.progress_manager.initialize_progress(
  80. callback_task_id=callback_task_id,
  81. user_id=file_info['user_id'],
  82. stages=[
  83. {"stage_name": "文件上传", "progress": 100, "status": "completed"},
  84. {"stage_name": "文档处理", "progress": 0, "status": "pending"},
  85. {"stage_name": "AI审查", "progress": 0, "status": "pending"},
  86. {"stage_name": "报告生成", "progress": 0, "status": "pending"}
  87. ]
  88. ))
  89. # 6. 启动处理流程(同步执行)
  90. self._process_task_chain_sync(task_chain, file_info['file_content'], file_info['file_type'])
  91. # logger.info(f"提交文档处理任务: {callback_task_id}")
  92. logger.info(f"施工方案审查任务已完成! ")
  93. logger.info(f"文件ID: {file_info['file_id']}")
  94. logger.info(f"文件名:{file_info['file_name']}")
  95. except Exception as e:
  96. logger.error(f"提交文档处理任务失败: {str(e)}")
  97. raise
  98. async def _process_task_chain(self, task_chain: TaskChain, file_content: bytes, file_type: str):
  99. """处理文档任务链 - 串行执行,内部并发"""
  100. try:
  101. task_chain.started_at = datetime.now()
  102. # 阶段1:文档处理(串行)
  103. async with self.doc_semaphore:
  104. task_chain.current_stage = "document_processing"
  105. document_workflow = DocumentWorkflow(
  106. file_id=task_chain.file_id,
  107. callback_task_id=task_chain.callback_task_id,
  108. user_id=task_chain.user_id,
  109. progress_manager=self.progress_manager,
  110. redis_duplicate_checker=self.redis_duplicate_checker
  111. )
  112. doc_result = await document_workflow.execute(file_content, file_type)
  113. task_chain.results['document'] = doc_result
  114. # 阶段2:AI审查(内部并发)
  115. task_chain.current_stage = "ai_review"
  116. structured_content = doc_result['structured_content']
  117. ai_workflow = AIReviewWorkflow(
  118. file_id=task_chain.file_id,
  119. callback_task_id=task_chain.callback_task_id,
  120. user_id=task_chain.user_id,
  121. structured_content=structured_content,
  122. progress_manager=self.progress_manager
  123. )
  124. ai_result = await ai_workflow.execute()
  125. task_chain.results['ai_review'] = ai_result
  126. # 阶段3:报告生成(串行)
  127. task_chain.current_stage = "report_generation"
  128. report_workflow = ReportWorkflow(
  129. file_id=task_chain.file_id,
  130. callback_task_id=task_chain.callback_task_id,
  131. user_id=task_chain.user_id,
  132. ai_review_results=ai_result,
  133. progress_manager=self.progress_manager
  134. )
  135. report_result = await report_workflow.execute()
  136. task_chain.results['report'] = report_result
  137. # 完成任务链
  138. task_chain.status = "completed"
  139. task_chain.completed_at = datetime.now()
  140. # 清理任务注册
  141. await self.redis_duplicate_checker.unregister_task(task_chain.file_id)
  142. logger.info(f"文档处理任务链完成: {task_chain.callback_task_id}")
  143. except Exception as e:
  144. task_chain.status = "failed"
  145. logger.error(f"文档处理任务链失败: {task_chain.callback_task_id}, 错误: {str(e)}")
  146. # 清理任务注册
  147. await self.redis_duplicate_checker.unregister_task(task_chain.file_id)
  148. raise
  149. finally:
  150. # 清理活跃任务
  151. if task_chain.callback_task_id in self.active_chains:
  152. del self.active_chains[task_chain.callback_task_id]
  153. def _process_task_chain_sync(self, task_chain: TaskChain, file_content: bytes, file_type: str):
  154. """同步处理文档任务链(用于Celery worker)"""
  155. try:
  156. task_chain.started_at = datetime.now()
  157. # 阶段1:文档处理(串行)
  158. task_chain.current_stage = "document_processing"
  159. document_workflow = DocumentWorkflow(
  160. file_id=task_chain.file_id,
  161. callback_task_id=task_chain.callback_task_id,
  162. user_id=task_chain.user_id,
  163. progress_manager=self.progress_manager,
  164. redis_duplicate_checker=self.redis_duplicate_checker
  165. )
  166. # 同步执行文档处理
  167. loop = asyncio.new_event_loop()
  168. asyncio.set_event_loop(loop)
  169. doc_result = loop.run_until_complete(document_workflow.execute(file_content, file_type))
  170. loop.close()
  171. task_chain.results['document'] = doc_result
  172. # 阶段2:AI审查(内部并发)
  173. task_chain.current_stage = "ai_review"
  174. structured_content = doc_result['structured_content']
  175. ai_workflow = AIReviewWorkflow(
  176. file_id=task_chain.file_id,
  177. callback_task_id=task_chain.callback_task_id,
  178. user_id=task_chain.user_id,
  179. structured_content=structured_content,
  180. progress_manager=self.progress_manager
  181. )
  182. # 同步执行AI审查
  183. loop = asyncio.new_event_loop()
  184. asyncio.set_event_loop(loop)
  185. ai_result = loop.run_until_complete(ai_workflow.execute())
  186. loop.close()
  187. task_chain.results['ai_review'] = ai_result
  188. # 阶段3:报告生成(串行)
  189. task_chain.current_stage = "report_generation"
  190. report_workflow = ReportWorkflow(
  191. file_id=task_chain.file_id,
  192. callback_task_id=task_chain.callback_task_id,
  193. user_id=task_chain.user_id,
  194. ai_review_results=ai_result,
  195. progress_manager=self.progress_manager
  196. )
  197. # 同步执行报告生成
  198. loop = asyncio.new_event_loop()
  199. asyncio.set_event_loop(loop)
  200. report_result = loop.run_until_complete(report_workflow.execute())
  201. loop.close()
  202. task_chain.results['report'] = report_result
  203. # 完成任务链
  204. task_chain.status = "completed"
  205. task_chain.completed_at = datetime.now()
  206. # 清理任务注册
  207. asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
  208. logger.info(f"文档处理任务链完成: {task_chain.callback_task_id}")
  209. return task_chain.results
  210. except Exception as e:
  211. task_chain.status = "failed"
  212. logger.error(f"文档处理任务链失败: {task_chain.callback_task_id}, 错误: {str(e)}")
  213. # 清理任务注册
  214. asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
  215. raise
  216. finally:
  217. # 清理活跃任务
  218. if task_chain.callback_task_id in self.active_chains:
  219. del self.active_chains[task_chain.callback_task_id]
  220. async def update_task_status(self, callback_task_id: str) -> Optional[Dict]:
  221. """更新任务状态"""
  222. pass