| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- from .celery_app import app
- from core.construction_write.workflow_manager import workflow_manager
- from foundation.infrastructure.tracing import TraceContext
- from foundation.observability.logger.loggering import write_logger as logger
- def _is_non_retryable_error(error: Exception) -> bool:
- error_str = str(error).lower()
- non_retryable_tokens = [
- "401",
- "403",
- "unauthorized",
- "forbidden",
- "invalid api key",
- "incorrect api key",
- "authentication",
- "permission denied",
- ]
- return any(token in error_str for token in non_retryable_tokens)
- @app.task(bind=True, queue="construction_write")
- def submit_outline_generation_task(self, task_info: dict, _system_trace_id: str = None):
- """Celery task for construction-write outline generation."""
- # 恢复 trace_id 上下文
- if _system_trace_id:
- TraceContext.set_trace_id(_system_trace_id)
- logger.info(f"Celery任务恢复 trace_id: {_system_trace_id}")
- callback_task_id = task_info.get("callback_task_id")
- project_name = task_info.get("project_info", {}).get("project_name", "")
- user_id = task_info.get("user_id", "")
- logger.info(f"=== Celery任务接收调试 ===")
- logger.info(f"队列ID: {self.request.id}")
- logger.info(f"回调任务ID: {callback_task_id}")
- logger.info(f"用户ID: {user_id}")
- logger.info(f"项目: {project_name}")
- logger.info(f"开始执行大纲生成业务逻辑")
- try:
- self.update_state(
- state="current",
- meta={
- "current": 0,
- "total": 100,
- "status": "outline_generation_started",
- "callback_task_id": callback_task_id,
- "project_name": project_name,
- },
- )
- result = workflow_manager.submit_outline_generation_sync(task_info)
- self.update_state(
- state="current",
- meta={
- "current": 100,
- "total": 100,
- "status": "outline_generation_completed",
- "callback_task_id": result.get("callback_task_id"),
- "overall_task_status": result.get("overall_task_status"),
- },
- )
- return {
- "status": "success",
- "callback_task_id": result.get("callback_task_id"),
- "overall_task_status": result.get("overall_task_status"),
- "result": result,
- }
- except Exception as exc:
- logger.error(f"大纲生成任务失败: {exc}")
- logger.exception("详细错误信息:")
- if _is_non_retryable_error(exc):
- logger.error("检测到不可重试错误,Celery任务不再重试")
- raise
- self.retry(countdown=60, max_retries=2, exc=exc)
- raise
|