tasks.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. """
  2. Celery任务定义
  3. 只负责任务调度,具体业务逻辑由WorkflowManager处理
  4. """
  5. from celery import current_task
  6. from .celery_app import app
  7. from core.base.workflow_manager import WorkflowManager
  8. from foundation.logger.loggering import server_logger as logger
  9. from foundation.utils.time_statistics import track_execution_time
  10. @app.task(bind=True)
  11. def submit_task_processing_task(self, file_info: dict):
  12. """
  13. 提交任务处理到Celery队列
  14. 这个任务只负责调用WorkflowManager,不包含业务逻辑
  15. """
  16. import traceback
  17. # 添加调试信息
  18. logger.info("=== Celery任务接收调试 ===")
  19. logger.info(f"队列ID: {self.request.id}")
  20. logger.info(f"文件ID: {file_info.get('file_id')}")
  21. logger.info(f"回调任务ID: {file_info.get('callback_task_id')}")
  22. logger.info("=== 任务接收调用栈 ===")
  23. for line in traceback.format_stack():
  24. logger.debug(f" {line.strip()}")
  25. logger.info("=== 调用栈结束 ===")
  26. try:
  27. # 更新任务状态 - 开始处理
  28. self.update_state(
  29. state='PROGRESS',
  30. meta={
  31. 'current': 0,
  32. 'total': 100,
  33. 'status': '开始处理文档',
  34. 'file_id': file_info.get('file_id')
  35. }
  36. )
  37. logger.info(f"开始执行业务逻辑,文件ID: {file_info.get('file_id')}")
  38. # 创建独立的WorkflowManager实例执行业务逻辑
  39. workflow_manager = WorkflowManager(
  40. max_concurrent_docs=1, # Celery worker中单任务执行
  41. max_concurrent_reviews=5
  42. )
  43. # 同步执行(Celery worker本身就是独立的进程)
  44. result = workflow_manager.submit_task_processing_sync(file_info)
  45. # 更新任务状态 - 完成
  46. self.update_state(
  47. state='PROGRESS',
  48. meta={
  49. 'current': 100,
  50. 'total': 100,
  51. 'status': '处理完成',
  52. 'file_id': file_info.get('file_id')
  53. }
  54. )
  55. return {
  56. 'status': 'success',
  57. 'file_id': file_info.get('file_id'),
  58. 'callback_task_id': file_info.get('callback_task_id'),
  59. 'result': result
  60. }
  61. except Exception as e:
  62. # 记录错误并重试
  63. logger.error(f"任务处理失败: {str(e)}")
  64. logger.exception("详细错误信息:")
  65. # 自动重试,延迟60秒,最多重试2次
  66. self.retry(countdown=60, max_retries=2, exc=e)
  67. raise