| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- """
- 施工方案审查任务控制接口
- 提供任务终止、查询等控制功能
- """
- import asyncio
- from typing import List, Optional, Dict, Any
- from pydantic import BaseModel, Field
- from fastapi import APIRouter, HTTPException, Query
- from foundation.observability.logger.loggering import server_logger as logger
- from core.base.workflow_manager import WorkflowManager
- task_control_router = APIRouter(prefix="/sgsc", tags=["任务控制"])
- # 初始化工作流管理器
- workflow_manager = WorkflowManager()
- class TerminateTaskRequest(BaseModel):
- """终止任务请求模型"""
- callback_task_id: str = Field(..., description="任务回调ID")
- operator: str = Field(..., description="操作人(用户ID)")
- class Config:
- extra = "forbid"
- class TerminateTaskResponse(BaseModel):
- """终止任务响应模型"""
- success: bool = Field(..., description="操作是否成功")
- message: str = Field(..., description="操作消息")
- task_info: Optional[Dict[str, Any]] = Field(None, description="任务信息")
- class TaskListResponse(BaseModel):
- """任务列表响应模型"""
- total: int = Field(..., description="活跃任务总数")
- tasks: List[Dict[str, Any]] = Field(..., description="任务列表")
- class TaskInfoResponse(BaseModel):
- """任务信息响应模型"""
- exists: bool = Field(..., description="任务是否存在")
- task_info: Optional[Dict[str, Any]] = Field(None, description="任务信息")
- @task_control_router.post("/task/terminate", response_model=TerminateTaskResponse)
- async def terminate_task(request: TerminateTaskRequest):
- """
- 终止正在执行的审查任务
- Args:
- request: 终止任务请求
- Returns:
- TerminateTaskResponse: 终止结果
- Note:
- - 终止信号通过 Redis 存储,支持跨进程
- - 任务会在当前节点完成后停止
- - 前端可以通过 SSE 接收到 terminated 状态
- """
- try:
- logger.info(f"收到任务终止请求: {request.callback_task_id}, 操作人: {request.operator}")
- # 设置终止信号
- result = await workflow_manager.set_terminate_signal(
- callback_task_id=request.callback_task_id,
- operator=request.operator
- )
- return TerminateTaskResponse(
- success=result["success"],
- message=result["message"],
- task_info=result.get("task_info")
- )
- except Exception as e:
- logger.error(f"终止任务失败: {str(e)}", exc_info=True)
- raise HTTPException(status_code=500, detail=f"终止任务失败: {str(e)}")
- @task_control_router.get("/task/list", response_model=TaskListResponse)
- async def get_active_tasks():
- """
- 获取当前活跃任务列表
- Returns:
- TaskListResponse: 活跃任务列表
- Note:
- 只返回 status=processing 的任务
- """
- try:
- logger.info("查询活跃任务列表")
- # 获取活跃任务
- active_tasks = await workflow_manager.get_active_tasks()
- return TaskListResponse(
- total=len(active_tasks),
- tasks=active_tasks
- )
- except Exception as e:
- logger.error(f"获取活跃任务列表失败: {str(e)}", exc_info=True)
- raise HTTPException(status_code=500, detail=f"获取任务列表失败: {str(e)}")
- @task_control_router.get("/task/info", response_model=TaskInfoResponse)
- async def get_task_info(
- callback_task_id: str = Query(..., description="任务回调ID")
- ):
- """
- 获取指定任务的详细信息
- Args:
- callback_task_id: 任务回调ID
- Returns:
- TaskInfoResponse: 任务信息
- Note:
- 如果任务不存在或已完成,exists 返回 False
- """
- try:
- logger.info(f"查询任务信息: {callback_task_id}")
- # 获取任务信息
- task_info = await workflow_manager.get_task_info(callback_task_id)
- if task_info:
- return TaskInfoResponse(
- exists=True,
- task_info=task_info
- )
- else:
- return TaskInfoResponse(
- exists=False,
- task_info=None
- )
- except Exception as e:
- logger.error(f"获取任务信息失败: {str(e)}", exc_info=True)
- raise HTTPException(status_code=500, detail=f"获取任务信息失败: {str(e)}")
- # ==================== 测试接口 ====================
- @task_control_router.get("/task/health")
- async def health_check():
- """健康检查接口"""
- return {
- "status": "ok",
- "service": "task_control",
- "message": "任务控制服务运行正常"
- }
|