task_cancel_views.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. # -*- coding: utf-8 -*-
  2. """
  3. 大纲生成任务取消接口
  4. 提供取消正在执行的大纲生成任务功能
  5. """
  6. import json
  7. import time
  8. from typing import Optional, Dict, Any
  9. from pydantic import BaseModel, Field
  10. from fastapi import APIRouter, HTTPException
  11. from foundation.observability.logger.loggering import write_logger as logger
  12. from foundation.infrastructure.tracing import TraceContext, auto_trace
  13. from core.base.workflow_manager import WorkflowManager
  14. from core.base.sse_manager import unified_sse_manager
  15. # 创建路由
  16. task_cancel_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
  17. # 初始化工作流管理器
  18. workflow_manager = WorkflowManager(
  19. max_concurrent_docs=3,
  20. max_concurrent_reviews=5
  21. )
  22. class TaskCancelRequest(BaseModel):
  23. """任务取消请求模型
  24. 示例请求:
  25. {
  26. "task_id": "outline_abc123456789",
  27. "user_id": "user-001",
  28. "cancel_reason": "用户主动取消"
  29. }
  30. """
  31. task_id: str = Field(..., description="任务ID", example="outline_abc123456789")
  32. user_id: str = Field(..., description="用户ID", example="user-001")
  33. cancel_reason: Optional[str] = Field(
  34. default="用户主动取消",
  35. description="取消原因"
  36. )
  37. class Config:
  38. extra = "forbid"
  39. class TaskCancelResponse(BaseModel):
  40. """任务取消响应模型"""
  41. code: int = Field(..., description="状态码")
  42. message: str = Field(..., description="状态消息")
  43. data: Optional[Dict[str, Any]] = Field(None, description="响应数据")
  44. def validate_user_id(user_id: str) -> None:
  45. """验证用户标识"""
  46. supported_users = {'user-001', 'user-002', 'user-003'}
  47. if user_id not in supported_users:
  48. raise HTTPException(
  49. status_code=403,
  50. detail={
  51. "code": "CANCEL_001",
  52. "error_type": "INVALID_USER",
  53. "message": "用户标识未提供或无效"
  54. }
  55. )
  56. @task_cancel_router.post("/task_cancel", response_model=TaskCancelResponse)
  57. @auto_trace(generate_if_missing=True)
  58. async def task_cancel(request: TaskCancelRequest):
  59. """
  60. 取消大纲生成任务
  61. 取消正在执行的大纲生成任务,支持取消本服务的生成任务。
  62. 【修复】现在支持取消预注册状态(pending)的任务,即任务提交后、Worker 执行前的时间段。
  63. Args:
  64. request: 任务取消请求参数
  65. Returns:
  66. 取消结果
  67. Example:
  68. POST /sgbx/task_cancel
  69. {
  70. "task_id": "outline_abc123456789",
  71. "user_id": "user-001",
  72. "cancel_reason": "用户主动取消"
  73. }
  74. """
  75. trace_id = f"cancel_{request.task_id}"
  76. TraceContext.set_trace_id(trace_id)
  77. try:
  78. logger.info(f"[{trace_id}] 接收任务取消请求: task_id={request.task_id}, user_id={request.user_id}")
  79. # 参数验证
  80. validate_user_id(request.user_id)
  81. # 检查任务是否存在
  82. try:
  83. task_info = await workflow_manager.get_outline_sgbx_task_info(request.task_id)
  84. except Exception as e:
  85. logger.warning(f"[{trace_id}] 获取任务信息异常: {e}")
  86. task_info = None
  87. if not task_info:
  88. return TaskCancelResponse(
  89. code=404,
  90. message="任务不存在或已完成",
  91. data={
  92. "task_id": request.task_id,
  93. "status": "not_found"
  94. }
  95. )
  96. # 检查任务状态
  97. task_status = task_info.get("status") or task_info.get("overall_task_status", "unknown")
  98. if task_status == "cancelled":
  99. return TaskCancelResponse(
  100. code=200,
  101. message="任务已处于取消状态",
  102. data={
  103. "task_id": request.task_id,
  104. "status": "already_cancelled",
  105. "cancelled_at": task_info.get("cancelled_at")
  106. }
  107. )
  108. if task_status in ["completed", "failed"]:
  109. return TaskCancelResponse(
  110. code=400,
  111. message=f"任务已{task_status},无法取消",
  112. data={
  113. "task_id": request.task_id,
  114. "current_status": task_status
  115. }
  116. )
  117. # 【修复】使用 workflow_manager 的 set_outline_terminate_signal 方法
  118. # 支持 pending(预注册)和 processing(执行中)两种状态
  119. cancelled_at = int(time.time())
  120. result = await workflow_manager.set_outline_terminate_signal(
  121. callback_task_id=request.task_id,
  122. operator=request.user_id
  123. )
  124. if not result.get("success"):
  125. logger.warning(f"[{trace_id}] 设置终止信号失败: {result.get('message')}")
  126. return TaskCancelResponse(
  127. code=400,
  128. message=result.get("message", "取消任务失败"),
  129. data={
  130. "task_id": request.task_id,
  131. "current_status": task_info.get("status")
  132. }
  133. )
  134. logger.info(f"[{trace_id}] 终止信号已设置: {request.task_id}")
  135. # 【修复】如果是预注册状态(pending),任务已被直接取消,无需其他操作
  136. is_pre_registered = task_info.get("is_pre_registered", False)
  137. if is_pre_registered or task_status == "pending":
  138. logger.info(f"[{trace_id}] 预注册任务已被取消: {request.task_id}")
  139. # 更新进度信息
  140. try:
  141. await workflow_manager.progress_manager.update_stage_progress(
  142. callback_task_id=request.task_id,
  143. overall_task_status="cancelled",
  144. status="cancelled",
  145. message=f"任务已被用户取消: {request.cancel_reason}"
  146. )
  147. except Exception as e:
  148. logger.warning(f"[{trace_id}] 更新进度信息失败: {e}")
  149. return TaskCancelResponse(
  150. code=200,
  151. message="任务已成功取消(未开始执行)",
  152. data={
  153. "task_id": request.task_id,
  154. "status": "cancelled",
  155. "cancelled_at": cancelled_at,
  156. "cancel_reason": request.cancel_reason,
  157. "cancelled_by": request.user_id,
  158. "is_pre_registered": True
  159. }
  160. )
  161. # 对于正在执行的任务(processing),设置额外的取消标志(兼容旧版)
  162. try:
  163. import redis.asyncio as redis_async
  164. from redis.asyncio.connection import ConnectionPool
  165. pool = ConnectionPool(
  166. host='127.0.0.1',
  167. port=6379,
  168. password='123456',
  169. db=0,
  170. decode_responses=True,
  171. max_connections=20,
  172. socket_connect_timeout=10,
  173. socket_timeout=10,
  174. retry_on_timeout=True,
  175. health_check_interval=30
  176. )
  177. redis_client = redis_async.Redis(connection_pool=pool)
  178. # 设置终止标志(兼容旧版 outline_workflow 的检查)
  179. terminate_data = json.dumps({
  180. "cancelled": True,
  181. "cancelled_by": request.user_id,
  182. "cancel_reason": request.cancel_reason,
  183. "cancelled_at": cancelled_at
  184. })
  185. await redis_client.set(f"terminate:{request.task_id}", terminate_data, ex=3600)
  186. await redis_client.close()
  187. await pool.disconnect()
  188. except Exception as e:
  189. logger.warning(f"[{trace_id}] 设置兼容终止标志失败: {e}")
  190. # 不影响主流程,继续执行
  191. # 尝试终止 Celery 任务
  192. celery_task_id = task_info.get("celery_task_id") or task_info.get("celery_id")
  193. if celery_task_id:
  194. try:
  195. from celery import current_app as celery_app
  196. celery_app.control.revoke(celery_task_id, terminate=True)
  197. logger.info(f"[{trace_id}] Celery 终止信号已发送: {celery_task_id}")
  198. except Exception as e:
  199. logger.warning(f"[{trace_id}] 终止 Celery 任务失败: {e}")
  200. # 发送取消事件到 SSE
  201. try:
  202. cancel_event = {
  203. "callback_task_id": request.task_id,
  204. "status": "cancelled",
  205. "overall_task_status": "cancelled",
  206. "message": f"任务已被用户取消: {request.cancel_reason}",
  207. "cancelled_at": cancelled_at,
  208. "cancelled_by": request.user_id
  209. }
  210. await unified_sse_manager.send_progress(request.task_id, cancel_event)
  211. await unified_sse_manager.close_connection(request.task_id)
  212. except Exception as e:
  213. logger.warning(f"[{trace_id}] 关闭 SSE 连接失败: {e}")
  214. return TaskCancelResponse(
  215. code=200,
  216. message="任务取消成功",
  217. data={
  218. "task_id": request.task_id,
  219. "status": "cancelled",
  220. "cancelled_at": cancelled_at,
  221. "cancel_reason": request.cancel_reason,
  222. "cancelled_by": request.user_id
  223. }
  224. )
  225. except HTTPException:
  226. raise
  227. except Exception as e:
  228. logger.error(f"[{trace_id}] 取消任务异常: {str(e)}", exc_info=True)
  229. return TaskCancelResponse(
  230. code=500,
  231. message=f"取消任务失败: {str(e)}",
  232. data={"task_id": request.task_id}
  233. )
  234. @task_cancel_router.get("/task_status")
  235. @auto_trace(generate_if_missing=True)
  236. async def get_task_status(
  237. task_id: str,
  238. user_id: str
  239. ):
  240. """
  241. 获取任务状态
  242. Args:
  243. task_id: 任务ID
  244. user_id: 用户ID
  245. Returns:
  246. 任务状态信息
  247. """
  248. try:
  249. logger.info(f"查询任务状态: task_id={task_id}")
  250. # 验证用户
  251. validate_user_id(user_id)
  252. # 获取任务信息
  253. task_info = await workflow_manager.get_outline_sgbx_task_info(task_id)
  254. if not task_info:
  255. return {
  256. "code": 404,
  257. "message": "任务不存在或已完成",
  258. "data": None
  259. }
  260. return {
  261. "code": 200,
  262. "message": "查询成功",
  263. "data": {
  264. "task_id": task_id,
  265. "status": task_info.get("status") or task_info.get("overall_task_status"),
  266. "progress": task_info.get("current", 0),
  267. "message": task_info.get("message", ""),
  268. "updated_at": task_info.get("updated_at")
  269. }
  270. }
  271. except Exception as e:
  272. logger.error(f"查询任务状态失败: {str(e)}", exc_info=True)
  273. return {
  274. "code": 500,
  275. "message": f"查询失败: {str(e)}",
  276. "data": None
  277. }