""" 施工方案审查任务控制接口 提供任务终止、查询等控制功能 """ import asyncio from typing import List, Optional, Dict, Any from pydantic import BaseModel, Field from fastapi import APIRouter, HTTPException, Query from foundation.observability.logger.loggering import review_logger as logger from core.base.workflow_manager import WorkflowManager task_control_router = APIRouter(prefix="/sgsc", tags=["任务控制"]) # 初始化工作流管理器 workflow_manager = WorkflowManager() class TerminateTaskRequest(BaseModel): """终止任务请求模型""" callback_task_id: str = Field(..., description="任务回调ID") operator: str = Field(..., description="操作人(用户ID)") class Config: extra = "forbid" class TerminateTaskResponse(BaseModel): """终止任务响应模型""" success: bool = Field(..., description="操作是否成功") message: str = Field(..., description="操作消息") task_info: Optional[Dict[str, Any]] = Field(None, description="任务信息") class TaskListResponse(BaseModel): """任务列表响应模型""" total: int = Field(..., description="活跃任务总数") tasks: List[Dict[str, Any]] = Field(..., description="任务列表") class TaskInfoResponse(BaseModel): """任务信息响应模型""" exists: bool = Field(..., description="任务是否存在") task_info: Optional[Dict[str, Any]] = Field(None, description="任务信息") @task_control_router.post("/task/terminate", response_model=TerminateTaskResponse) async def terminate_task(request: TerminateTaskRequest): """ 终止正在执行的审查任务 Args: request: 终止任务请求 Returns: TerminateTaskResponse: 终止结果 Note: - 终止信号通过 Redis 存储,支持跨进程 - 任务会在当前节点完成后停止 - 前端可以通过 SSE 接收到 terminated 状态 """ try: logger.info(f"收到任务终止请求: {request.callback_task_id}, 操作人: {request.operator}") # 设置终止信号 result = await workflow_manager.set_terminate_signal( callback_task_id=request.callback_task_id, operator=request.operator ) return TerminateTaskResponse( success=result["success"], message=result["message"], task_info=result.get("task_info") ) except Exception as e: logger.error(f"终止任务失败: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"终止任务失败: {str(e)}") @task_control_router.get("/task/list", response_model=TaskListResponse) async def get_active_tasks(): """ 获取当前活跃任务列表 Returns: TaskListResponse: 活跃任务列表 Note: 只返回 status=processing 的任务 """ try: logger.info("查询活跃任务列表") # 获取活跃任务 active_tasks = await workflow_manager.get_active_tasks() return TaskListResponse( total=len(active_tasks), tasks=active_tasks ) except Exception as e: logger.error(f"获取活跃任务列表失败: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"获取任务列表失败: {str(e)}") @task_control_router.get("/task/info", response_model=TaskInfoResponse) async def get_task_info( callback_task_id: str = Query(..., description="任务回调ID") ): """ 获取指定任务的详细信息 Args: callback_task_id: 任务回调ID Returns: TaskInfoResponse: 任务信息 Note: 如果任务不存在或已完成,exists 返回 False """ try: logger.info(f"查询任务信息: {callback_task_id}") # 获取任务信息 task_info = await workflow_manager.get_task_info(callback_task_id) if task_info: return TaskInfoResponse( exists=True, task_info=task_info ) else: return TaskInfoResponse( exists=False, task_info=None ) except Exception as e: logger.error(f"获取任务信息失败: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"获取任务信息失败: {str(e)}") # ==================== 测试接口 ==================== @task_control_router.get("/task/health") async def health_check(): """健康检查接口""" return { "status": "ok", "service": "task_control", "message": "任务控制服务运行正常" }