workflow_manager.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. """
  2. 基于LangGraph的工作流管理器
  3. 负责任务的创建、编排和执行,使用LangGraph进行状态管理
  4. """
  5. import asyncio
  6. from typing import Dict, Optional
  7. from datetime import datetime
  8. from foundation.observability.logger.loggering import server_logger as logger
  9. from foundation.observability.monitoring.time_statistics import track_execution_time
  10. from .progress_manager import ProgressManager
  11. from .redis_duplicate_checker import RedisDuplicateChecker
  12. from .task_models import TaskFileInfo, TaskChain
  13. from ..construction_review.workflows import DocumentWorkflow,AIReviewWorkflow
  14. class ProgressManagerRegistry:
  15. """ProgressManager注册表 - 为每个任务管理独立的ProgressManager实例"""
  16. _registry = {} # {callback_task_id: ProgressManager}
  17. @classmethod
  18. def register_progress_manager(cls, callback_task_id: str, progress_manager: ProgressManager):
  19. """注册ProgressManager实例"""
  20. cls._registry[callback_task_id] = progress_manager
  21. logger.info(f"注册ProgressManager实例: {callback_task_id}, ID: {id(progress_manager)}")
  22. @classmethod
  23. def get_progress_manager(cls, callback_task_id: str) -> ProgressManager:
  24. """获取ProgressManager实例"""
  25. return cls._registry.get(callback_task_id)
  26. @classmethod
  27. def unregister_progress_manager(cls, callback_task_id: str):
  28. """注销ProgressManager实例"""
  29. if callback_task_id in cls._registry:
  30. del cls._registry[callback_task_id]
  31. logger.info(f"注销ProgressManager实例: {callback_task_id}")
  32. class WorkflowManager:
  33. """工作流管理器"""
  34. def __init__(self, max_concurrent_docs: int = 5, max_concurrent_reviews: int = 10):
  35. self.max_concurrent_docs = max_concurrent_docs
  36. self.max_concurrent_reviews = max_concurrent_reviews
  37. # 并发控制
  38. self.doc_semaphore = asyncio.Semaphore(max_concurrent_docs)
  39. self.review_semaphore = asyncio.Semaphore(max_concurrent_reviews)
  40. # 服务组件
  41. self.progress_manager = ProgressManager()
  42. self.redis_duplicate_checker = RedisDuplicateChecker()
  43. # 活跃任务跟踪
  44. self.active_chains: Dict[str, TaskChain] = {}
  45. self._cleanup_task_started = False
  46. async def submit_task_processing(self, file_info: dict) -> str:
  47. """异步提交任务处理(用于file_upload层)"""
  48. from foundation.infrastructure.messaging.tasks import submit_task_processing_task
  49. from foundation.infrastructure.tracing.celery_trace import CeleryTraceManager
  50. try:
  51. logger.info(f"提交文档处理任务到Celery: {file_info['file_id']}")
  52. # 使用CeleryTraceManager提交任务,自动传递trace_id
  53. task = CeleryTraceManager.submit_celery_task(
  54. submit_task_processing_task,
  55. file_info
  56. )
  57. logger.info(f"Celery任务已提交,Task ID: {task.id}")
  58. return task.id
  59. except Exception as e:
  60. logger.error(f"提交Celery任务失败: {str(e)}")
  61. raise
  62. @track_execution_time
  63. def submit_task_processing_sync(self, file_info: dict) -> dict:
  64. """同步提交任务处理(用于Celery worker)"""
  65. try:
  66. logger.info(f"提交文档处理任务: {file_info['file_id']}")
  67. # 1. 创建TaskFileInfo对象(封装任务文件信息)
  68. task_file_info = TaskFileInfo(file_info)
  69. logger.info(f"创建任务文件信息: {task_file_info}")
  70. # 2. 生成任务链ID
  71. callback_task_id = task_file_info.callback_task_id
  72. # 3. 创建任务链(引用 TaskFileInfo,避免数据重复)
  73. task_chain = TaskChain(task_file_info)
  74. # 4. 标记任务开始
  75. task_chain.start_processing()
  76. # 5. 添加到活跃任务跟踪
  77. self.active_chains[callback_task_id] = task_chain
  78. # 5. 初始化进度管理
  79. asyncio.run(self.progress_manager.initialize_progress(
  80. callback_task_id=callback_task_id,
  81. user_id=task_file_info.user_id,
  82. stages=[]
  83. ))
  84. # 6. 启动处理流程(同步执行)
  85. self._process_task_chain_sync(task_chain, task_file_info, task_file_info.file_type)
  86. # logger.info(f"提交文档处理任务: {callback_task_id}")
  87. logger.info(f"施工方案审查任务已完成! ")
  88. logger.info(f"文件ID: {task_file_info.file_id}")
  89. logger.info(f"文件名:{task_file_info.file_name}")
  90. except Exception as e:
  91. logger.error(f"提交文档处理任务失败: {str(e)}")
  92. raise
  93. def _process_task_chain_sync(self, task_chain: TaskChain, task_file_info: TaskFileInfo, file_type: str):
  94. """同步处理文档任务链(用于Celery worker)"""
  95. try:
  96. file_content = task_file_info.file_content
  97. # 阶段1:文档处理(串行)
  98. document_workflow = DocumentWorkflow(
  99. task_file_info=task_file_info,
  100. progress_manager=self.progress_manager,
  101. redis_duplicate_checker=self.redis_duplicate_checker
  102. )
  103. # 同步执行文档处理
  104. loop = asyncio.new_event_loop()
  105. asyncio.set_event_loop(loop)
  106. doc_result = loop.run_until_complete(document_workflow.execute(file_content, file_type))
  107. loop.close()
  108. task_chain.results['document'] = doc_result
  109. # 阶段2:AI审查(内部并发)
  110. task_chain.update_stage("ai_review")
  111. structured_content = doc_result['structured_content']
  112. # 读取AI审查配置
  113. import configparser
  114. config = configparser.ConfigParser()
  115. config.read('config/config.ini', encoding='utf-8')
  116. max_review_units = config.getint('ai_review', 'MAX_REVIEW_UNITS', fallback=None)
  117. if max_review_units == 0: # 如果配置为0,表示审查所有
  118. max_review_units = None
  119. review_mode = config.get('ai_review', 'REVIEW_MODE', fallback='all')
  120. logger.info(f"AI审查配置: 最大审查数量={max_review_units}, 审查模式={review_mode}")
  121. ai_workflow = AIReviewWorkflow(
  122. task_file_info=task_file_info,
  123. structured_content=structured_content,
  124. progress_manager=self.progress_manager,
  125. max_review_units=max_review_units,
  126. review_mode=review_mode
  127. )
  128. # 同步执行AI审查
  129. loop = asyncio.new_event_loop()
  130. asyncio.set_event_loop(loop)
  131. ai_result = loop.run_until_complete(ai_workflow.execute())
  132. loop.close()
  133. task_chain.results['ai_review'] = ai_result
  134. # # 阶段3:报告生成(串行)
  135. # task_chain.current_stage = "report_generation"
  136. # report_workflow = ReportWorkflow(
  137. # file_id=task_chain.file_id,
  138. # callback_task_id=task_chain.callback_task_id,
  139. # user_id=task_chain.user_id,
  140. # ai_review_results=ai_result,
  141. # progress_manager=self.progress_manager
  142. # )
  143. # # 同步执行报告生成
  144. # loop = asyncio.new_event_loop()
  145. # asyncio.set_event_loop(loop)
  146. # report_result = loop.run_until_complete(report_workflow.execute())
  147. # loop.close()
  148. # task_chain.results['report'] = report_result
  149. # 完成任务链
  150. task_chain.complete_processing()
  151. # 清理任务注册
  152. asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
  153. # 通知SSE连接任务完成
  154. asyncio.run(self.progress_manager.complete_task(task_chain.callback_task_id, task_chain.user_id))
  155. # 清理Redis文件缓存
  156. try:
  157. from foundation.utils.redis_utils import delete_file_info
  158. asyncio.run(delete_file_info(task_chain.file_id))
  159. logger.info(f"已清理Redis文件缓存: {task_chain.file_id}")
  160. except Exception as e:
  161. logger.warning(f"清理Redis文件缓存失败: {str(e)}")
  162. logger.info(f"文档处理任务链完成: {task_chain.callback_task_id}")
  163. return task_chain.results
  164. except Exception as e:
  165. # 标记任务失败
  166. task_chain.fail_processing(str(e))
  167. logger.error(f"文档处理任务链失败: {task_chain.callback_task_id}, 错误: {str(e)}")
  168. # 清理任务注册
  169. asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
  170. # 清理Redis文件缓存(即使失败也清理)
  171. try:
  172. from foundation.utils.redis_utils import delete_file_info
  173. asyncio.run(delete_file_info(task_chain.file_id))
  174. logger.info(f"已清理Redis文件缓存: {task_chain.file_id}")
  175. except Exception as cleanup_error:
  176. logger.warning(f"清理Redis文件缓存失败: {str(cleanup_error)}")
  177. # 通知SSE连接任务失败
  178. error_result = {
  179. "error": str(e),
  180. "status": "failed",
  181. "timestamp": datetime.now().isoformat()
  182. }
  183. current_data = {
  184. "status": "failed",
  185. "result": error_result
  186. }
  187. asyncio.run(self.progress_manager.complete_task(task_chain.callback_task_id, task_chain.user_id, current_data))
  188. raise
  189. finally:
  190. # 清理活跃任务
  191. if task_chain.callback_task_id in self.active_chains:
  192. del self.active_chains[task_chain.callback_task_id]