tasks.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. """
  2. Celery任务定义
  3. 只负责任务调度,具体业务逻辑由WorkflowManager处理
  4. """
  5. from celery import current_task
  6. from .celery_app import app
  7. from core.base.workflow_manager import workflow_manager
  8. from foundation.observability.logger.loggering import review_logger, write_logger
  9. from foundation.observability.monitoring.time_statistics import track_execution_time
  10. @app.task(bind=True)
  11. def submit_task_processing_task(self, file_info: dict, _system_trace_id: str = None):
  12. """
  13. 提交任务处理到Celery队列
  14. 这个任务只负责调用WorkflowManager,不包含业务逻辑
  15. """
  16. import traceback
  17. logger = review_logger # 使用审查模块专用logger
  18. # 恢复trace_id上下文
  19. if _system_trace_id:
  20. from foundation.infrastructure.tracing import TraceContext
  21. TraceContext.set_trace_id(_system_trace_id)
  22. logger.info(f"Celery任务恢复")
  23. # 添加调试信息
  24. logger.info("=== Celery任务接收调试 ===")
  25. logger.info(f"队列ID: {self.request.id}")
  26. logger.info(f"文件ID: {file_info.get('file_id')}")
  27. logger.info(f"回调任务ID: {file_info.get('callback_task_id')}")
  28. logger.info("=== 任务接收调用栈 ===")
  29. for line in traceback.format_stack():
  30. logger.debug(f" {line.strip()}")
  31. logger.info("=== 调用栈结束 ===")
  32. try:
  33. # 更新任务状态 - 开始处理
  34. self.update_state(
  35. state='current',
  36. meta={
  37. 'current': 0,
  38. 'total': 100,
  39. 'status': '开始处理文档',
  40. 'file_id': file_info.get('file_id')
  41. }
  42. )
  43. logger.info(f"开始执行业务逻辑,文件ID: {file_info.get('file_id')}")
  44. # 复用进程内的 WorkflowManager 单例(Celery worker 与主进程是不同进程,
  45. # 单例在 worker 进程内独立创建一次,并发参数沿用模块级默认值)
  46. # 同步执行(Celery worker本身就是独立的进程)
  47. result = workflow_manager.submit_construction_review_task_processing_sync(file_info)
  48. # 更新任务状态 - 完成
  49. self.update_state(
  50. state='current',
  51. meta={
  52. 'current': 100,
  53. 'total': 100,
  54. 'status': '处理完成',
  55. 'file_id': file_info.get('file_id')
  56. }
  57. )
  58. return {
  59. 'status': 'success',
  60. 'file_id': file_info.get('file_id'),
  61. 'callback_task_id': file_info.get('callback_task_id'),
  62. 'result': result
  63. }
  64. except Exception as e:
  65. # 记录错误并重试
  66. logger.error(f"任务处理失败: {str(e)}")
  67. logger.exception("详细错误信息:")
  68. # 自动重试,延迟60秒,最多重试2次
  69. self.retry(countdown=60, max_retries=2, exc=e)
  70. raise
  71. # ==================== 施工方案编写任务 ====================
  72. @app.task(bind=True)
  73. def submit_outline_generation_task(self, task_info: dict, _system_trace_id: str = None):
  74. """
  75. 提交大纲生成任务到 Celery 队列
  76. 这个任务只负责调用 WorkflowManager,不包含业务逻辑
  77. """
  78. import traceback
  79. logger = write_logger # 使用编写模块专用logger
  80. # 恢复 trace_id 上下文
  81. if _system_trace_id:
  82. from foundation.infrastructure.tracing import TraceContext
  83. TraceContext.set_trace_id(_system_trace_id)
  84. logger.info(f"大纲生成 Celery 任务恢复 trace_id: {_system_trace_id}")
  85. logger.info("=== 大纲生成 Celery 任务接收 ===")
  86. logger.info(f"队列ID: {self.request.id}")
  87. logger.info(f"用户ID: {task_info.get('user_id')}")
  88. logger.info(f"项目: {task_info.get('project_info', {}).get('project_name', 'unknown')}")
  89. try:
  90. # 更新任务状态 - 开始处理
  91. self.update_state(
  92. state='current',
  93. meta={
  94. 'current': 0,
  95. 'total': 100,
  96. 'status': '开始生成大纲',
  97. 'callback_task_id': task_info.get('callback_task_id'),
  98. 'project_name': task_info.get('project_info', {}).get('project_name', '')
  99. }
  100. )
  101. logger.info(f"开始执行大纲生成业务逻辑")
  102. # 复用进程内的 WorkflowManager 单例
  103. # 同步执行
  104. result = workflow_manager.submit_outline_generation_sync(task_info)
  105. # 更新任务状态 - 完成
  106. self.update_state(
  107. state='current',
  108. meta={
  109. 'current': 100,
  110. 'total': 100,
  111. 'status': '大纲生成完成',
  112. 'callback_task_id': result.get('callback_task_id'),
  113. 'overall_task_status': result.get('overall_task_status')
  114. }
  115. )
  116. return {
  117. 'status': 'success',
  118. 'callback_task_id': result.get('callback_task_id'),
  119. 'overall_task_status': result.get('overall_task_status'),
  120. 'result': result
  121. }
  122. except Exception as e:
  123. logger.error(f"大纲生成任务失败: {str(e)}")
  124. logger.exception("详细错误信息:")
  125. # 自动重试,延迟60秒,最多重试2次
  126. self.retry(countdown=60, max_retries=2, exc=e)
  127. raise