| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- # -*- coding: utf-8 -*-
- """
- 大纲生成任务取消接口
- 提供取消正在执行的大纲生成任务功能
- """
- import json
- import time
- from typing import Optional, Dict, Any
- from pydantic import BaseModel, Field
- from fastapi import APIRouter, HTTPException
- from foundation.observability.logger.loggering import write_logger as logger
- from foundation.infrastructure.tracing import TraceContext, auto_trace
- from core.base.workflow_manager import WorkflowManager
- from core.base.sse_manager import unified_sse_manager
- # 创建路由
- task_cancel_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
- # 初始化工作流管理器
- workflow_manager = WorkflowManager(
- max_concurrent_docs=3,
- max_concurrent_reviews=5
- )
- class TaskCancelRequest(BaseModel):
- """任务取消请求模型
-
- 示例请求:
- {
- "task_id": "outline_abc123456789",
- "user_id": "user-001",
- "cancel_reason": "用户主动取消"
- }
- """
- task_id: str = Field(..., description="任务ID", example="outline_abc123456789")
- user_id: str = Field(..., description="用户ID", example="user-001")
- cancel_reason: Optional[str] = Field(
- default="用户主动取消",
- description="取消原因"
- )
- class Config:
- extra = "forbid"
- class TaskCancelResponse(BaseModel):
- """任务取消响应模型"""
- code: int = Field(..., description="状态码")
- message: str = Field(..., description="状态消息")
- data: Optional[Dict[str, Any]] = Field(None, description="响应数据")
- def validate_user_id(user_id: str) -> None:
- """验证用户标识"""
- supported_users = {'user-001', 'user-002', 'user-003'}
- if user_id not in supported_users:
- raise HTTPException(
- status_code=403,
- detail={
- "code": "CANCEL_001",
- "error_type": "INVALID_USER",
- "message": "用户标识未提供或无效"
- }
- )
- @task_cancel_router.post("/task_cancel", response_model=TaskCancelResponse)
- @auto_trace(generate_if_missing=True)
- async def task_cancel(request: TaskCancelRequest):
- """
- 取消大纲生成任务
-
- 取消正在执行的大纲生成任务,支持取消本服务的生成任务。
- 【修复】现在支持取消预注册状态(pending)的任务,即任务提交后、Worker 执行前的时间段。
-
- Args:
- request: 任务取消请求参数
-
- Returns:
- 取消结果
-
- Example:
- POST /sgbx/task_cancel
- {
- "task_id": "outline_abc123456789",
- "user_id": "user-001",
- "cancel_reason": "用户主动取消"
- }
- """
- trace_id = f"cancel_{request.task_id}"
- TraceContext.set_trace_id(trace_id)
-
- try:
- logger.info(f"[{trace_id}] 接收任务取消请求: task_id={request.task_id}, user_id={request.user_id}")
-
- # 参数验证
- validate_user_id(request.user_id)
-
- # 检查任务是否存在
- try:
- task_info = await workflow_manager.get_outline_sgbx_task_info(request.task_id)
- except Exception as e:
- logger.warning(f"[{trace_id}] 获取任务信息异常: {e}")
- task_info = None
-
- if not task_info:
- return TaskCancelResponse(
- code=404,
- message="任务不存在或已完成",
- data={
- "task_id": request.task_id,
- "status": "not_found"
- }
- )
-
- # 检查任务状态
- task_status = task_info.get("status") or task_info.get("overall_task_status", "unknown")
-
- if task_status == "cancelled":
- return TaskCancelResponse(
- code=200,
- message="任务已处于取消状态",
- data={
- "task_id": request.task_id,
- "status": "already_cancelled",
- "cancelled_at": task_info.get("cancelled_at")
- }
- )
-
- if task_status in ["completed", "failed"]:
- return TaskCancelResponse(
- code=400,
- message=f"任务已{task_status},无法取消",
- data={
- "task_id": request.task_id,
- "current_status": task_status
- }
- )
-
- # 【修复】使用 workflow_manager 的 set_outline_terminate_signal 方法
- # 支持 pending(预注册)和 processing(执行中)两种状态
- cancelled_at = int(time.time())
-
- result = await workflow_manager.set_outline_terminate_signal(
- callback_task_id=request.task_id,
- operator=request.user_id
- )
-
- if not result.get("success"):
- logger.warning(f"[{trace_id}] 设置终止信号失败: {result.get('message')}")
- return TaskCancelResponse(
- code=400,
- message=result.get("message", "取消任务失败"),
- data={
- "task_id": request.task_id,
- "current_status": task_info.get("status")
- }
- )
-
- logger.info(f"[{trace_id}] 终止信号已设置: {request.task_id}")
-
- # 【修复】如果是预注册状态(pending),任务已被直接取消,无需其他操作
- is_pre_registered = task_info.get("is_pre_registered", False)
- if is_pre_registered or task_status == "pending":
- logger.info(f"[{trace_id}] 预注册任务已被取消: {request.task_id}")
-
- # 更新进度信息
- try:
- await workflow_manager.progress_manager.update_stage_progress(
- callback_task_id=request.task_id,
- overall_task_status="cancelled",
- status="cancelled",
- message=f"任务已被用户取消: {request.cancel_reason}"
- )
- except Exception as e:
- logger.warning(f"[{trace_id}] 更新进度信息失败: {e}")
-
- return TaskCancelResponse(
- code=200,
- message="任务已成功取消(未开始执行)",
- data={
- "task_id": request.task_id,
- "status": "cancelled",
- "cancelled_at": cancelled_at,
- "cancel_reason": request.cancel_reason,
- "cancelled_by": request.user_id,
- "is_pre_registered": True
- }
- )
-
- # 对于正在执行的任务(processing),设置额外的取消标志(兼容旧版)
- try:
- import redis.asyncio as redis_async
- from redis.asyncio.connection import ConnectionPool
-
- pool = ConnectionPool(
- host='127.0.0.1',
- port=6379,
- password='123456',
- db=0,
- decode_responses=True,
- max_connections=20,
- socket_connect_timeout=10,
- socket_timeout=10,
- retry_on_timeout=True,
- health_check_interval=30
- )
-
- redis_client = redis_async.Redis(connection_pool=pool)
-
- # 设置终止标志(兼容旧版 outline_workflow 的检查)
- terminate_data = json.dumps({
- "cancelled": True,
- "cancelled_by": request.user_id,
- "cancel_reason": request.cancel_reason,
- "cancelled_at": cancelled_at
- })
-
- await redis_client.set(f"terminate:{request.task_id}", terminate_data, ex=3600)
-
- await redis_client.close()
- await pool.disconnect()
-
- except Exception as e:
- logger.warning(f"[{trace_id}] 设置兼容终止标志失败: {e}")
- # 不影响主流程,继续执行
-
- # 尝试终止 Celery 任务
- celery_task_id = task_info.get("celery_task_id") or task_info.get("celery_id")
- if celery_task_id:
- try:
- from celery import current_app as celery_app
- celery_app.control.revoke(celery_task_id, terminate=True)
- logger.info(f"[{trace_id}] Celery 终止信号已发送: {celery_task_id}")
- except Exception as e:
- logger.warning(f"[{trace_id}] 终止 Celery 任务失败: {e}")
-
- # 发送取消事件到 SSE
- try:
- cancel_event = {
- "callback_task_id": request.task_id,
- "status": "cancelled",
- "overall_task_status": "cancelled",
- "message": f"任务已被用户取消: {request.cancel_reason}",
- "cancelled_at": cancelled_at,
- "cancelled_by": request.user_id
- }
- await unified_sse_manager.send_progress(request.task_id, cancel_event)
- await unified_sse_manager.close_connection(request.task_id)
- except Exception as e:
- logger.warning(f"[{trace_id}] 关闭 SSE 连接失败: {e}")
-
- return TaskCancelResponse(
- code=200,
- message="任务取消成功",
- data={
- "task_id": request.task_id,
- "status": "cancelled",
- "cancelled_at": cancelled_at,
- "cancel_reason": request.cancel_reason,
- "cancelled_by": request.user_id
- }
- )
-
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"[{trace_id}] 取消任务异常: {str(e)}", exc_info=True)
- return TaskCancelResponse(
- code=500,
- message=f"取消任务失败: {str(e)}",
- data={"task_id": request.task_id}
- )
- @task_cancel_router.get("/task_status")
- @auto_trace(generate_if_missing=True)
- async def get_task_status(
- task_id: str,
- user_id: str
- ):
- """
- 获取任务状态
-
- Args:
- task_id: 任务ID
- user_id: 用户ID
-
- Returns:
- 任务状态信息
- """
- try:
- logger.info(f"查询任务状态: task_id={task_id}")
-
- # 验证用户
- validate_user_id(user_id)
-
- # 获取任务信息
- task_info = await workflow_manager.get_outline_sgbx_task_info(task_id)
-
- if not task_info:
- return {
- "code": 404,
- "message": "任务不存在或已完成",
- "data": None
- }
-
- return {
- "code": 200,
- "message": "查询成功",
- "data": {
- "task_id": task_id,
- "status": task_info.get("status") or task_info.get("overall_task_status"),
- "progress": task_info.get("current", 0),
- "message": task_info.get("message", ""),
- "updated_at": task_info.get("updated_at")
- }
- }
-
- except Exception as e:
- logger.error(f"查询任务状态失败: {str(e)}", exc_info=True)
- return {
- "code": 500,
- "message": f"查询失败: {str(e)}",
- "data": None
- }
|