# -*- coding: utf-8 -*- """ 大纲生成任务取消接口 提供取消正在执行的大纲生成任务功能 """ import json import time from typing import Optional, Dict, Any from pydantic import BaseModel, Field from fastapi import APIRouter, HTTPException from foundation.observability.logger.loggering import write_logger as logger from foundation.infrastructure.tracing import TraceContext, auto_trace from core.base.workflow_manager import WorkflowManager from core.base.sse_manager import unified_sse_manager # 创建路由 task_cancel_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"]) # 初始化工作流管理器 workflow_manager = WorkflowManager( max_concurrent_docs=3, max_concurrent_reviews=5 ) class TaskCancelRequest(BaseModel): """任务取消请求模型 示例请求: { "task_id": "outline_abc123456789", "user_id": "user-001", "cancel_reason": "用户主动取消" } """ task_id: str = Field(..., description="任务ID", example="outline_abc123456789") user_id: str = Field(..., description="用户ID", example="user-001") cancel_reason: Optional[str] = Field( default="用户主动取消", description="取消原因" ) class Config: extra = "forbid" class TaskCancelResponse(BaseModel): """任务取消响应模型""" code: int = Field(..., description="状态码") message: str = Field(..., description="状态消息") data: Optional[Dict[str, Any]] = Field(None, description="响应数据") def validate_user_id(user_id: str) -> None: """验证用户标识""" supported_users = {'user-001', 'user-002', 'user-003'} if user_id not in supported_users: raise HTTPException( status_code=403, detail={ "code": "CANCEL_001", "error_type": "INVALID_USER", "message": "用户标识未提供或无效" } ) @task_cancel_router.post("/task_cancel", response_model=TaskCancelResponse) @auto_trace(generate_if_missing=True) async def task_cancel(request: TaskCancelRequest): """ 取消大纲生成任务 取消正在执行的大纲生成任务,支持取消本服务的生成任务。 【修复】现在支持取消预注册状态(pending)的任务,即任务提交后、Worker 执行前的时间段。 Args: request: 任务取消请求参数 Returns: 取消结果 Example: POST /sgbx/task_cancel { "task_id": "outline_abc123456789", "user_id": "user-001", "cancel_reason": "用户主动取消" } """ trace_id = f"cancel_{request.task_id}" TraceContext.set_trace_id(trace_id) try: logger.info(f"[{trace_id}] 接收任务取消请求: task_id={request.task_id}, user_id={request.user_id}") # 参数验证 validate_user_id(request.user_id) # 检查任务是否存在 try: task_info = await workflow_manager.get_outline_sgbx_task_info(request.task_id) except Exception as e: logger.warning(f"[{trace_id}] 获取任务信息异常: {e}") task_info = None if not task_info: return TaskCancelResponse( code=404, message="任务不存在或已完成", data={ "task_id": request.task_id, "status": "not_found" } ) # 检查任务状态 task_status = task_info.get("status") or task_info.get("overall_task_status", "unknown") if task_status == "cancelled": return TaskCancelResponse( code=200, message="任务已处于取消状态", data={ "task_id": request.task_id, "status": "already_cancelled", "cancelled_at": task_info.get("cancelled_at") } ) if task_status in ["completed", "failed"]: return TaskCancelResponse( code=400, message=f"任务已{task_status},无法取消", data={ "task_id": request.task_id, "current_status": task_status } ) # 【修复】使用 workflow_manager 的 set_outline_terminate_signal 方法 # 支持 pending(预注册)和 processing(执行中)两种状态 cancelled_at = int(time.time()) result = await workflow_manager.set_outline_terminate_signal( callback_task_id=request.task_id, operator=request.user_id ) if not result.get("success"): logger.warning(f"[{trace_id}] 设置终止信号失败: {result.get('message')}") return TaskCancelResponse( code=400, message=result.get("message", "取消任务失败"), data={ "task_id": request.task_id, "current_status": task_info.get("status") } ) logger.info(f"[{trace_id}] 终止信号已设置: {request.task_id}") # 【修复】如果是预注册状态(pending),任务已被直接取消,无需其他操作 is_pre_registered = task_info.get("is_pre_registered", False) if is_pre_registered or task_status == "pending": logger.info(f"[{trace_id}] 预注册任务已被取消: {request.task_id}") # 更新进度信息 try: await workflow_manager.progress_manager.update_stage_progress( callback_task_id=request.task_id, overall_task_status="cancelled", status="cancelled", message=f"任务已被用户取消: {request.cancel_reason}" ) except Exception as e: logger.warning(f"[{trace_id}] 更新进度信息失败: {e}") return TaskCancelResponse( code=200, message="任务已成功取消(未开始执行)", data={ "task_id": request.task_id, "status": "cancelled", "cancelled_at": cancelled_at, "cancel_reason": request.cancel_reason, "cancelled_by": request.user_id, "is_pre_registered": True } ) # 对于正在执行的任务(processing),设置额外的取消标志(兼容旧版) try: import redis.asyncio as redis_async from redis.asyncio.connection import ConnectionPool pool = ConnectionPool( host='127.0.0.1', port=6379, password='123456', db=0, decode_responses=True, max_connections=20, socket_connect_timeout=10, socket_timeout=10, retry_on_timeout=True, health_check_interval=30 ) redis_client = redis_async.Redis(connection_pool=pool) # 设置终止标志(兼容旧版 outline_workflow 的检查) terminate_data = json.dumps({ "cancelled": True, "cancelled_by": request.user_id, "cancel_reason": request.cancel_reason, "cancelled_at": cancelled_at }) await redis_client.set(f"terminate:{request.task_id}", terminate_data, ex=3600) await redis_client.close() await pool.disconnect() except Exception as e: logger.warning(f"[{trace_id}] 设置兼容终止标志失败: {e}") # 不影响主流程,继续执行 # 尝试终止 Celery 任务 celery_task_id = task_info.get("celery_task_id") or task_info.get("celery_id") if celery_task_id: try: from celery import current_app as celery_app celery_app.control.revoke(celery_task_id, terminate=True) logger.info(f"[{trace_id}] Celery 终止信号已发送: {celery_task_id}") except Exception as e: logger.warning(f"[{trace_id}] 终止 Celery 任务失败: {e}") # 发送取消事件到 SSE try: cancel_event = { "callback_task_id": request.task_id, "status": "cancelled", "overall_task_status": "cancelled", "message": f"任务已被用户取消: {request.cancel_reason}", "cancelled_at": cancelled_at, "cancelled_by": request.user_id } await unified_sse_manager.send_progress(request.task_id, cancel_event) await unified_sse_manager.close_connection(request.task_id) except Exception as e: logger.warning(f"[{trace_id}] 关闭 SSE 连接失败: {e}") return TaskCancelResponse( code=200, message="任务取消成功", data={ "task_id": request.task_id, "status": "cancelled", "cancelled_at": cancelled_at, "cancel_reason": request.cancel_reason, "cancelled_by": request.user_id } ) except HTTPException: raise except Exception as e: logger.error(f"[{trace_id}] 取消任务异常: {str(e)}", exc_info=True) return TaskCancelResponse( code=500, message=f"取消任务失败: {str(e)}", data={"task_id": request.task_id} ) @task_cancel_router.get("/task_status") @auto_trace(generate_if_missing=True) async def get_task_status( task_id: str, user_id: str ): """ 获取任务状态 Args: task_id: 任务ID user_id: 用户ID Returns: 任务状态信息 """ try: logger.info(f"查询任务状态: task_id={task_id}") # 验证用户 validate_user_id(user_id) # 获取任务信息 task_info = await workflow_manager.get_outline_sgbx_task_info(task_id) if not task_info: return { "code": 404, "message": "任务不存在或已完成", "data": None } return { "code": 200, "message": "查询成功", "data": { "task_id": task_id, "status": task_info.get("status") or task_info.get("overall_task_status"), "progress": task_info.get("current", 0), "message": task_info.get("message", ""), "updated_at": task_info.get("updated_at") } } except Exception as e: logger.error(f"查询任务状态失败: {str(e)}", exc_info=True) return { "code": 500, "message": f"查询失败: {str(e)}", "data": None }