task_control.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. """
  2. 施工方案审查任务控制接口
  3. 提供任务终止、查询等控制功能
  4. """
  5. import asyncio
  6. from typing import List, Optional, Dict, Any
  7. from pydantic import BaseModel, Field
  8. from fastapi import APIRouter, HTTPException, Query
  9. from foundation.observability.logger.loggering import server_logger as logger
  10. from core.base.workflow_manager import WorkflowManager
  11. task_control_router = APIRouter(prefix="/sgsc", tags=["任务控制"])
  12. # 初始化工作流管理器
  13. workflow_manager = WorkflowManager()
  14. class TerminateTaskRequest(BaseModel):
  15. """终止任务请求模型"""
  16. callback_task_id: str = Field(..., description="任务回调ID")
  17. operator: str = Field(..., description="操作人(用户ID)")
  18. class Config:
  19. extra = "forbid"
  20. class TerminateTaskResponse(BaseModel):
  21. """终止任务响应模型"""
  22. success: bool = Field(..., description="操作是否成功")
  23. message: str = Field(..., description="操作消息")
  24. task_info: Optional[Dict[str, Any]] = Field(None, description="任务信息")
  25. class TaskListResponse(BaseModel):
  26. """任务列表响应模型"""
  27. total: int = Field(..., description="活跃任务总数")
  28. tasks: List[Dict[str, Any]] = Field(..., description="任务列表")
  29. class TaskInfoResponse(BaseModel):
  30. """任务信息响应模型"""
  31. exists: bool = Field(..., description="任务是否存在")
  32. task_info: Optional[Dict[str, Any]] = Field(None, description="任务信息")
  33. @task_control_router.post("/task/terminate", response_model=TerminateTaskResponse)
  34. async def terminate_task(request: TerminateTaskRequest):
  35. """
  36. 终止正在执行的审查任务
  37. Args:
  38. request: 终止任务请求
  39. Returns:
  40. TerminateTaskResponse: 终止结果
  41. Note:
  42. - 终止信号通过 Redis 存储,支持跨进程
  43. - 任务会在当前节点完成后停止
  44. - 前端可以通过 SSE 接收到 terminated 状态
  45. """
  46. try:
  47. logger.info(f"收到任务终止请求: {request.callback_task_id}, 操作人: {request.operator}")
  48. # 设置终止信号
  49. result = await workflow_manager.set_terminate_signal(
  50. callback_task_id=request.callback_task_id,
  51. operator=request.operator
  52. )
  53. return TerminateTaskResponse(
  54. success=result["success"],
  55. message=result["message"],
  56. task_info=result.get("task_info")
  57. )
  58. except Exception as e:
  59. logger.error(f"终止任务失败: {str(e)}", exc_info=True)
  60. raise HTTPException(status_code=500, detail=f"终止任务失败: {str(e)}")
  61. @task_control_router.get("/task/list", response_model=TaskListResponse)
  62. async def get_active_tasks():
  63. """
  64. 获取当前活跃任务列表
  65. Returns:
  66. TaskListResponse: 活跃任务列表
  67. Note:
  68. 只返回 status=processing 的任务
  69. """
  70. try:
  71. logger.info("查询活跃任务列表")
  72. # 获取活跃任务
  73. active_tasks = await workflow_manager.get_active_tasks()
  74. return TaskListResponse(
  75. total=len(active_tasks),
  76. tasks=active_tasks
  77. )
  78. except Exception as e:
  79. logger.error(f"获取活跃任务列表失败: {str(e)}", exc_info=True)
  80. raise HTTPException(status_code=500, detail=f"获取任务列表失败: {str(e)}")
  81. @task_control_router.get("/task/info", response_model=TaskInfoResponse)
  82. async def get_task_info(
  83. callback_task_id: str = Query(..., description="任务回调ID")
  84. ):
  85. """
  86. 获取指定任务的详细信息
  87. Args:
  88. callback_task_id: 任务回调ID
  89. Returns:
  90. TaskInfoResponse: 任务信息
  91. Note:
  92. 如果任务不存在或已完成,exists 返回 False
  93. """
  94. try:
  95. logger.info(f"查询任务信息: {callback_task_id}")
  96. # 获取任务信息
  97. task_info = await workflow_manager.get_task_info(callback_task_id)
  98. if task_info:
  99. return TaskInfoResponse(
  100. exists=True,
  101. task_info=task_info
  102. )
  103. else:
  104. return TaskInfoResponse(
  105. exists=False,
  106. task_info=None
  107. )
  108. except Exception as e:
  109. logger.error(f"获取任务信息失败: {str(e)}", exc_info=True)
  110. raise HTTPException(status_code=500, detail=f"获取任务信息失败: {str(e)}")
  111. # ==================== 测试接口 ====================
  112. @task_control_router.get("/task/health")
  113. async def health_check():
  114. """健康检查接口"""
  115. return {
  116. "status": "ok",
  117. "service": "task_control",
  118. "message": "任务控制服务运行正常"
  119. }