tasks.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from .celery_app import app
  2. from core.construction_write.workflow_manager import workflow_manager
  3. from foundation.infrastructure.tracing import TraceContext
  4. from foundation.observability.logger.loggering import write_logger as logger
  5. def _is_non_retryable_error(error: Exception) -> bool:
  6. error_str = str(error).lower()
  7. non_retryable_tokens = [
  8. "401",
  9. "403",
  10. "unauthorized",
  11. "forbidden",
  12. "invalid api key",
  13. "incorrect api key",
  14. "authentication",
  15. "permission denied",
  16. ]
  17. return any(token in error_str for token in non_retryable_tokens)
  18. @app.task(bind=True, queue="construction_write")
  19. def submit_outline_generation_task(self, task_info: dict, _system_trace_id: str = None):
  20. """Celery task for construction-write outline generation."""
  21. # 恢复 trace_id 上下文
  22. if _system_trace_id:
  23. TraceContext.set_trace_id(_system_trace_id)
  24. logger.info(f"Celery任务恢复 trace_id: {_system_trace_id}")
  25. callback_task_id = task_info.get("callback_task_id")
  26. project_name = task_info.get("project_info", {}).get("project_name", "")
  27. user_id = task_info.get("user_id", "")
  28. logger.info(f"=== Celery任务接收调试 ===")
  29. logger.info(f"队列ID: {self.request.id}")
  30. logger.info(f"回调任务ID: {callback_task_id}")
  31. logger.info(f"用户ID: {user_id}")
  32. logger.info(f"项目: {project_name}")
  33. logger.info(f"开始执行大纲生成业务逻辑")
  34. try:
  35. self.update_state(
  36. state="current",
  37. meta={
  38. "current": 0,
  39. "total": 100,
  40. "status": "outline_generation_started",
  41. "callback_task_id": callback_task_id,
  42. "project_name": project_name,
  43. },
  44. )
  45. result = workflow_manager.submit_outline_generation_sync(task_info)
  46. self.update_state(
  47. state="current",
  48. meta={
  49. "current": 100,
  50. "total": 100,
  51. "status": "outline_generation_completed",
  52. "callback_task_id": result.get("callback_task_id"),
  53. "overall_task_status": result.get("overall_task_status"),
  54. },
  55. )
  56. return {
  57. "status": "success",
  58. "callback_task_id": result.get("callback_task_id"),
  59. "overall_task_status": result.get("overall_task_status"),
  60. "result": result,
  61. }
  62. except Exception as exc:
  63. logger.error(f"大纲生成任务失败: {exc}")
  64. logger.exception("详细错误信息:")
  65. if _is_non_retryable_error(exc):
  66. logger.error("检测到不可重试错误,Celery任务不再重试")
  67. raise
  68. self.retry(countdown=60, max_retries=2, exc=exc)
  69. raise