regenerate_views.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. # -*- coding: utf-8 -*-
  2. """
  3. 重新生成大纲接口 (SSE 版本)
  4. """
  5. import uuid
  6. import json
  7. import time
  8. import asyncio
  9. from typing import Optional, Dict, Any, List, AsyncGenerator, Union
  10. from pydantic import BaseModel, Field
  11. from fastapi import APIRouter, HTTPException
  12. from fastapi.responses import StreamingResponse
  13. from foundation.observability.logger.loggering import write_logger as logger
  14. from foundation.infrastructure.tracing import TraceContext, auto_trace
  15. from core.base.workflow_manager import WorkflowManager
  16. from core.base.sse_manager import unified_sse_manager
  17. from core.base.progress_manager import ProgressManager
  18. from redis.asyncio import Redis as AsyncRedis
  19. # 创建路由
  20. regenerate_outline_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
  21. # 初始化工作流管理器
  22. workflow_manager = WorkflowManager(
  23. max_concurrent_docs=3,
  24. max_concurrent_reviews=5
  25. )
  26. # 初始化进度管理器
  27. progress_manager = ProgressManager()
  28. async def sse_progress_callback(callback_task_id: str, current_data: dict):
  29. """SSE 推送回调函数 - 接收进度更新并推送到客户端"""
  30. await unified_sse_manager.send_progress(callback_task_id, current_data)
  31. def format_sse_event(event_type: str, data: str) -> str:
  32. """格式化 SSE 事件 - 按照 SSE 协议格式化事件数据"""
  33. lines = [
  34. f"event: {event_type}",
  35. f"data: {data}",
  36. "",
  37. ""
  38. ]
  39. return "\n".join(lines) + "\n"
  40. class BaseInfo(BaseModel):
  41. """项目基础信息"""
  42. project_name: str = Field(..., description="方案名称", example="罗成依达大桥上部结构专项施工方案")
  43. construct_location: str = Field(..., description="建设地点", example="四川省凉山州")
  44. engineering_type: str = Field(..., description="方案模版类型", example="T型梁")
  45. class ProjectInfo(BaseModel):
  46. """项目信息(嵌套结构)"""
  47. base_info: BaseInfo = Field(..., description="基础信息")
  48. selectable: Optional[str] = Field("", description="其他可选信息")
  49. class TemplateStructureItem(BaseModel):
  50. """模板结构项(支持嵌套children)"""
  51. index: str = Field(..., description="章节编号", example="2")
  52. level: int = Field(..., description="层级", ge=1, le=5)
  53. title: str = Field(..., description="章节标题", example="工程概况")
  54. code: str = Field(..., description="章节代码", example="overview")
  55. children: Optional[List[Dict[str, Any]]] = Field(None, description="子章节(递归结构)")
  56. class GenerationTemplate(BaseModel):
  57. """大纲生成模板"""
  58. source_file: Optional[str] = Field(None, description="源文件", example="方案编写助手原文关键词规范文档修改版-2026-2-5.md")
  59. alias: Optional[str] = Field(None, description="别名", example="施工方案知识审查与编写体系")
  60. structure: List[Union[TemplateStructureItem, Dict[str, Any]]] = Field(..., description="模板结构")
  61. class RegenerateOutlineRequest(BaseModel):
  62. """重新生成大纲请求
  63. 复用大纲生成接口的请求定义,额外添加 regenerate_config 字段用于指定重新生成配置。
  64. project_info 和 generation_template 为可选字段,不传入则使用原任务的信息。
  65. 示例请求:
  66. {
  67. "task_id": "task-20250130-123456",
  68. "user_id": "user-001",
  69. "project_info": { // 可选,不传则使用原任务的项目信息
  70. "base_info": {
  71. "project_name": "罗成依达大桥上部结构专项施工方案",
  72. "construct_location": "四川省凉山州",
  73. "engineering_type": "T型梁"
  74. },
  75. "selectable": ""
  76. },
  77. "generation_template": { // 可选,不传则使用原任务的模板
  78. "source_file": "...",
  79. "alias": "...",
  80. "structure": [...]
  81. },
  82. "generation_chapterenum": ["overview_DesignSummary_MainTechnicalStandards"], // 可选
  83. "regenerate_config": {
  84. "regenerate_mode": "chapter",
  85. "target_path": "2.1",
  86. "preserve_children": true,
  87. "reason": "调整内容结构"
  88. }
  89. }
  90. """
  91. task_id: str = Field(..., description="原大纲生成任务ID")
  92. user_id: str = Field(..., description="用户ID")
  93. project_info: Optional[ProjectInfo] = Field(None, description="项目基础信息(可选)")
  94. generation_template: Optional[GenerationTemplate] = Field(None, description="大纲生成模板(可选)")
  95. generation_chapterenum: Optional[List[str]] = Field(None, description="生成章节代码列表(可选)")
  96. regenerate_config: Dict[str, Any] = Field(..., description="重新生成配置")
  97. @regenerate_outline_router.post("/regenerate_outline", response_model=None)
  98. @auto_trace(generate_if_missing=True)
  99. async def regenerate_outline(request: RegenerateOutlineRequest):
  100. """
  101. 重新生成大纲接口 (SSE 流式响应)
  102. 【任务状态管理】
  103. - 重新生成会创建**新任务**,原任务状态保持不变
  104. - 新任务通过 regenerate_config.source_task_id 关联原任务
  105. - 原任务仍可查询,不受影响
  106. 【字段说明】
  107. - generation_chapterenum: 可选,默认使用原任务的章节列表
  108. - project_info: 可选,默认使用原任务的项目信息
  109. - generation_template: 可选,默认使用原任务的模板
  110. 【错误处理】
  111. - 原任务不存在: 返回 404 错误事件
  112. - 原任务已完成/失败: 允许重新生成(基于已完成结果进行局部调整)
  113. - 重新生成配置缺失: 返回 400 错误事件
  114. 【与 /generating_outline 的复用关系】
  115. - 复用 generating_outline 的核心 SSE 事件流生成逻辑
  116. - 差异点:1) 构建任务信息时合并原任务数据 2) 添加 regenerate_config 标记
  117. """
  118. # ===== 1. 参数校验 =====
  119. if not request.regenerate_config:
  120. logger.error("重新生成配置缺失")
  121. raise HTTPException(status_code=400, detail="regenerate_config 为必填项")
  122. # 生成新任务ID(重要:重新生成创建新任务,不覆盖原任务)
  123. new_callback_task_id = f"outline_regen_{uuid.uuid4().hex[:16]}"
  124. source_task_id = request.task_id # 原任务ID用于数据查询
  125. TraceContext.set_trace_id(new_callback_task_id)
  126. user_id = request.user_id
  127. regenerate_config = request.regenerate_config
  128. logger.info(f"接收重新生成大纲 SSE 请求: "
  129. f"source_task_id={source_task_id}, "
  130. f"new_task_id={new_callback_task_id}, "
  131. f"user_id={user_id}, "
  132. f"target={regenerate_config.get('target_path', 'unknown')}")
  133. # ===== 2. 获取原任务信息(带错误处理)=====
  134. original_task = None
  135. try:
  136. original_task = await workflow_manager.get_outline_sgbx_task_info(source_task_id)
  137. except Exception as e:
  138. logger.warning(f"获取原任务信息异常: {source_task_id}, error={e}")
  139. # 原任务不存在处理
  140. if not original_task:
  141. logger.error(f"原任务不存在: {source_task_id}")
  142. async def error_not_found():
  143. error_data = json.dumps({
  144. "callback_task_id": new_callback_task_id,
  145. "source_task_id": source_task_id,
  146. "user_id": user_id,
  147. "current": 0,
  148. "stage_name": "原任务不存在",
  149. "status": "error",
  150. "message": f"原任务不存在或已过期: {source_task_id}",
  151. "overall_task_status": "failed",
  152. "error_code": "SOURCE_TASK_NOT_FOUND",
  153. "updated_at": int(time.time())
  154. }, ensure_ascii=False)
  155. yield format_sse_event("error", error_data)
  156. return StreamingResponse(
  157. error_not_found(),
  158. media_type="text/event-stream",
  159. headers={
  160. "Cache-Control": "no-cache",
  161. "Connection": "keep-alive",
  162. "X-Accel-Buffering": "no"
  163. }
  164. )
  165. # 获取原任务状态
  166. original_status = original_task.get("status") or original_task.get("overall_task_status", "unknown")
  167. logger.info(f"原任务状态: {source_task_id} = {original_status}")
  168. # 使用统一 SSE 管理器建立连接(使用新任务ID)
  169. queue = await unified_sse_manager.establish_connection(new_callback_task_id, sse_progress_callback)
  170. # ===== 3. 复用 generating_outline 的核心逻辑 =====
  171. async def generate_regenerate_events() -> AsyncGenerator[str, None]:
  172. """生成重新生成 SSE 事件流 - 复用 generating_outline 模式"""
  173. redis_check_client = None
  174. try:
  175. # ===== 3.1 初始化 Redis 连接(复用 generating_outline 模式)=====
  176. try:
  177. redis_check_client = AsyncRedis(
  178. host='127.0.0.1',
  179. port=6379,
  180. password='123456',
  181. db=0,
  182. decode_responses=True,
  183. socket_connect_timeout=2,
  184. socket_timeout=2
  185. )
  186. except Exception as e:
  187. logger.warning(f"[{new_callback_task_id}] 创建取消检查Redis连接失败: {e}")
  188. # 定义取消检查函数(复用 generating_outline 模式)
  189. async def is_task_cancelled() -> bool:
  190. """检查任务是否被取消"""
  191. if not redis_check_client or not new_callback_task_id:
  192. return False
  193. try:
  194. return await redis_check_client.exists(f"terminate:{new_callback_task_id}") > 0
  195. except Exception:
  196. return False
  197. # ===== 3.2 检查取消(复用 generating_outline 检查点1)=====
  198. if await is_task_cancelled():
  199. logger.info(f"[{new_callback_task_id}] 连接建立前检测到取消信号")
  200. cancelled_data = json.dumps({
  201. "callback_task_id": new_callback_task_id,
  202. "source_task_id": source_task_id,
  203. "user_id": user_id,
  204. "current": 0,
  205. "stage_name": "任务已取消",
  206. "status": "cancelled",
  207. "message": "任务已被用户取消",
  208. "overall_task_status": "cancelled",
  209. "updated_at": int(time.time())
  210. }, ensure_ascii=False)
  211. yield format_sse_event("cancelled", cancelled_data)
  212. return
  213. # ===== 3.3 发送连接确认(复用 generating_outline 模式)=====
  214. connected_data = json.dumps({
  215. "callback_task_id": new_callback_task_id,
  216. "source_task_id": source_task_id,
  217. "user_id": user_id,
  218. "current": 0,
  219. "stage_name": "连接建立",
  220. "status": "connected",
  221. "message": f"SSE 连接已建立,正在启动重新生成任务(原任务: {source_task_id}, 状态: {original_status})...",
  222. "overall_task_status": "processing",
  223. "updated_at": int(time.time())
  224. }, ensure_ascii=False)
  225. yield format_sse_event("connected", connected_data)
  226. # ===== 3.4 构建任务信息(合并原任务数据 + 新配置)=====
  227. # 优先使用传入的 project_info,否则使用原任务的
  228. if request.project_info:
  229. base_info = request.project_info.base_info
  230. project_info_flat = {
  231. "project_name": base_info.project_name,
  232. "construct_location": base_info.construct_location,
  233. "engineering_type": base_info.engineering_type,
  234. "selectable": request.project_info.selectable or ""
  235. }
  236. else:
  237. project_info_flat = original_task.get("project_info", {})
  238. # 处理 generation_template
  239. if request.generation_template:
  240. outline_structure = [
  241. item.dict() if isinstance(item, TemplateStructureItem) else item
  242. for item in request.generation_template.structure
  243. ]
  244. template_alias = request.generation_template.alias or "default_template"
  245. else:
  246. # 从原任务提取模板结构
  247. outline_structure = original_task.get("generation_template", [])
  248. if not outline_structure:
  249. outline_structure = original_task.get("results", {}).get("outline_structure", [])
  250. template_alias = original_task.get("template_id", "default_template")
  251. # 处理 generation_chapterenum(可选,默认使用原任务)
  252. generation_chapterenum = request.generation_chapterenum
  253. if generation_chapterenum is None:
  254. generation_chapterenum = original_task.get("generation_chapterenum", [])
  255. # 如果原任务也没有,则根据 regenerate_config.target_path 推断
  256. if not generation_chapterenum and regenerate_config.get("target_path"):
  257. target_path = regenerate_config.get("target_path")
  258. # 内嵌:根据路径查找章节代码的逻辑
  259. original_outline = original_task.get("results", {}).get("outline_structure", [])
  260. chapter_code = None
  261. if original_outline and target_path:
  262. path_parts = target_path.split(".")
  263. def search_in_nodes(nodes, depth=0):
  264. if depth >= len(path_parts):
  265. return None
  266. target_index = path_parts[depth]
  267. for node in nodes:
  268. node_index = str(node.get("index", ""))
  269. if node_index == target_index:
  270. if depth == len(path_parts) - 1:
  271. return node.get("code")
  272. children = node.get("children", [])
  273. if children:
  274. result = search_in_nodes(children, depth + 1)
  275. if result:
  276. return result
  277. return None
  278. chapter_code = search_in_nodes(original_outline)
  279. if chapter_code:
  280. generation_chapterenum = [chapter_code]
  281. # 构建完整任务信息(与 generating_outline 格式保持一致)
  282. sgbx_task_info = {
  283. "callback_task_id": new_callback_task_id,
  284. "source_task_id": source_task_id, # 关联原任务
  285. "user_id": user_id,
  286. "project_info": project_info_flat,
  287. "template_id": template_alias,
  288. "generation_chapterenum": generation_chapterenum,
  289. "generation_template": outline_structure,
  290. "similarity_config": original_task.get("similarity_config", {
  291. "topk_plans": 3,
  292. "topk_fragments": 10,
  293. "threshold": 0.75
  294. }),
  295. "knowledge_config": original_task.get("knowledge_config", {
  296. "topk": 3,
  297. "threshold": 0.75
  298. }),
  299. # 重新生成特有配置
  300. "regenerate_config": regenerate_config,
  301. "is_regenerate": True,
  302. "original_task_status": original_status # 记录原任务状态
  303. }
  304. logger.info(f"重新生成任务信息构建完成: "
  305. f"new_task_id={new_callback_task_id}, "
  306. f"source_task_id={source_task_id}, "
  307. f"target={regenerate_config.get('target_path', 'unknown')}, "
  308. f"chapters={generation_chapterenum}")
  309. # ===== 3.5 检查取消(复用 generating_outline 检查点2)=====
  310. if await is_task_cancelled():
  311. logger.info(f"[{new_callback_task_id}] 任务提交前检测到取消信号")
  312. cancelled_data = json.dumps({
  313. "callback_task_id": new_callback_task_id,
  314. "source_task_id": source_task_id,
  315. "user_id": user_id,
  316. "current": 0,
  317. "stage_name": "任务已取消",
  318. "status": "cancelled",
  319. "message": "任务已被用户取消",
  320. "overall_task_status": "cancelled",
  321. "updated_at": int(time.time())
  322. }, ensure_ascii=False)
  323. yield format_sse_event("cancelled", cancelled_data)
  324. return
  325. # ===== 3.6 发送处理中事件(复用 generating_outline 模式)=====
  326. processing_data = json.dumps({
  327. "callback_task_id": new_callback_task_id,
  328. "source_task_id": source_task_id,
  329. "user_id": user_id,
  330. "current": 5,
  331. "stage_name": "任务提交中",
  332. "status": "processing",
  333. "message": f"正在提交重新生成任务(目标: {regenerate_config.get('target_path', 'unknown')})...",
  334. "overall_task_status": "processing",
  335. "updated_at": int(time.time())
  336. }, ensure_ascii=False)
  337. yield format_sse_event("processing", processing_data)
  338. # ===== 3.7 提交任务到 Celery(复用 generating_outline 模式)=====
  339. celery_task_id = await workflow_manager.submit_outline_generation_task(sgbx_task_info)
  340. logger.info(f"重新生成任务已提交: "
  341. f"new_callback_task_id={new_callback_task_id}, "
  342. f"celery_task_id={celery_task_id}")
  343. # 发送任务提交成功事件
  344. submitted_data = json.dumps({
  345. "callback_task_id": new_callback_task_id,
  346. "source_task_id": source_task_id,
  347. "user_id": user_id,
  348. "current": 10,
  349. "stage_name": "任务已提交",
  350. "status": "submitted",
  351. "message": "重新生成任务已提交,正在执行...",
  352. "overall_task_status": "processing",
  353. "updated_at": int(time.time()),
  354. "celery_task_id": celery_task_id
  355. }, ensure_ascii=False)
  356. yield format_sse_event("submitted", submitted_data)
  357. # ===== 3.8 持续监听进度(完全复用 generating_outline 模式)=====
  358. last_progress = 10
  359. last_progress_data = None
  360. last_event_type = "processing"
  361. last_message = ""
  362. no_change_count = 0
  363. while True:
  364. try:
  365. # 检查取消(复用 generating_outline 检查点3)
  366. if await is_task_cancelled():
  367. logger.info(f"[{new_callback_task_id}] 进度轮询中检测到取消信号")
  368. cancelled_data = json.dumps({
  369. "callback_task_id": new_callback_task_id,
  370. "source_task_id": source_task_id,
  371. "user_id": user_id,
  372. "current": last_progress,
  373. "stage_name": "任务已取消",
  374. "status": "cancelled",
  375. "message": "任务已被用户取消",
  376. "overall_task_status": "cancelled",
  377. "updated_at": int(time.time())
  378. }, ensure_ascii=False)
  379. yield format_sse_event("cancelled", cancelled_data)
  380. return
  381. # 从 Redis 获取最新进度
  382. progress_data = await progress_manager.get_progress(new_callback_task_id)
  383. if progress_data:
  384. current_progress = progress_data.get("current", last_progress)
  385. current_event_type = progress_data.get("event_type", "processing")
  386. current_message = progress_data.get("message", "")
  387. # 检查进度数据中是否已经是取消状态
  388. if progress_data.get("overall_task_status") == "cancelled":
  389. logger.info(f"[{new_callback_task_id}] 从进度数据检测到取消状态")
  390. yield format_sse_event("cancelled", json.dumps(progress_data, ensure_ascii=False))
  391. return
  392. # 进度有变化时推送
  393. should_push = False
  394. if current_progress != last_progress:
  395. should_push = True
  396. elif current_event_type != last_event_type:
  397. should_push = True
  398. elif current_message != last_message:
  399. should_push = True
  400. elif last_progress_data is None:
  401. should_push = True
  402. elif progress_data.get("overall_task_status") != last_progress_data.get("overall_task_status"):
  403. should_push = True
  404. if should_push:
  405. last_progress = current_progress
  406. last_event_type = current_event_type
  407. last_message = current_message
  408. last_progress_data = progress_data
  409. yield format_sse_event("processing", json.dumps(progress_data, ensure_ascii=False))
  410. no_change_count = 0
  411. else:
  412. no_change_count += 1
  413. # 检查任务状态
  414. status = progress_data.get("overall_task_status")
  415. # 检测到取消立即返回
  416. if status == "cancelled":
  417. logger.info(f"[{new_callback_task_id}] 检测到任务已取消")
  418. yield format_sse_event("cancelled", json.dumps(progress_data, ensure_ascii=False))
  419. return
  420. # 检查任务是否完成
  421. if status in ["completed", "failed", "terminated"]:
  422. break
  423. await asyncio.sleep(0.5)
  424. # 每 6 秒发送一次心跳
  425. if no_change_count >= 30:
  426. heartbeat_data = json.dumps({
  427. "callback_task_id": new_callback_task_id,
  428. "source_task_id": source_task_id,
  429. "user_id": user_id,
  430. "current": last_progress,
  431. "stage_name": "执行中",
  432. "status": "processing",
  433. "message": "重新生成任务正在执行中...",
  434. "overall_task_status": "processing",
  435. "updated_at": int(time.time())
  436. }, ensure_ascii=False)
  437. yield format_sse_event("heartbeat", heartbeat_data)
  438. no_change_count = 0
  439. except Exception as e:
  440. logger.warning(f"轮询进度异常: {new_callback_task_id}, 错误: {str(e)}")
  441. await asyncio.sleep(0.5)
  442. # ===== 3.9 获取最终结果(复用 generating_outline 模式)=====
  443. final_result = await workflow_manager.get_outline_sgbx_task_info(new_callback_task_id)
  444. # 检查取消(复用 generating_outline 检查点4)
  445. if await is_task_cancelled():
  446. logger.info(f"[{new_callback_task_id}] 结果返回前检测到取消信号")
  447. cancelled_data = json.dumps({
  448. "callback_task_id": new_callback_task_id,
  449. "source_task_id": source_task_id,
  450. "user_id": user_id,
  451. "current": last_progress,
  452. "stage_name": "任务已取消",
  453. "status": "cancelled",
  454. "message": "任务已被用户取消",
  455. "overall_task_status": "cancelled",
  456. "updated_at": int(time.time())
  457. }, ensure_ascii=False)
  458. yield format_sse_event("cancelled", cancelled_data)
  459. return
  460. # 检查任务结果是否为已取消
  461. if final_result and final_result.get("status") == "cancelled":
  462. logger.info(f"[{new_callback_task_id}] 任务结果状态为已取消,不返回实际结果")
  463. cancelled_data = json.dumps({
  464. "callback_task_id": new_callback_task_id,
  465. "source_task_id": source_task_id,
  466. "user_id": user_id,
  467. "current": last_progress,
  468. "stage_name": "任务已取消",
  469. "status": "cancelled",
  470. "message": final_result.get("message", "任务已被用户取消"),
  471. "overall_task_status": "cancelled",
  472. "updated_at": int(time.time())
  473. }, ensure_ascii=False)
  474. yield format_sse_event("cancelled", cancelled_data)
  475. return
  476. # ===== 3.10 返回最终结果(复用 generating_outline 模式)=====
  477. if final_result and final_result.get("status") == "completed":
  478. completed_data = json.dumps({
  479. "callback_task_id": new_callback_task_id,
  480. "source_task_id": source_task_id,
  481. "user_id": user_id,
  482. "current": 100,
  483. "stage_name": "重新生成完成",
  484. "status": "completed",
  485. "message": "大纲重新生成任务已完成",
  486. "overall_task_status": "completed",
  487. "updated_at": int(time.time()),
  488. "result": {
  489. "outline_structure": final_result.get("results", {}).get("outline_structure", []),
  490. "similar_plan": final_result.get("results", {}).get("similar_plan", [])
  491. }
  492. }, ensure_ascii=False)
  493. yield format_sse_event("completed", completed_data)
  494. else:
  495. failed_data = json.dumps({
  496. "callback_task_id": new_callback_task_id,
  497. "source_task_id": source_task_id,
  498. "user_id": user_id,
  499. "current": last_progress,
  500. "stage_name": "任务失败",
  501. "status": "failed",
  502. "message": final_result.get("results", {}).get("error", "重新生成任务失败") if final_result else "任务执行失败",
  503. "overall_task_status": "failed",
  504. "updated_at": int(time.time())
  505. }, ensure_ascii=False)
  506. yield format_sse_event("failed", failed_data)
  507. except Exception as e:
  508. logger.error(f"重新生成大纲 SSE 事件流错误: {str(e)}", exc_info=True)
  509. error_data = json.dumps({
  510. "callback_task_id": new_callback_task_id,
  511. "source_task_id": source_task_id,
  512. "user_id": user_id,
  513. "current": 0,
  514. "stage_name": "系统错误",
  515. "status": "error",
  516. "message": f"系统错误: {str(e)}",
  517. "overall_task_status": "failed",
  518. "updated_at": int(time.time())
  519. }, ensure_ascii=False)
  520. yield format_sse_event("error", error_data)
  521. finally:
  522. # 关闭 Redis 连接
  523. if redis_check_client:
  524. try:
  525. await redis_check_client.close()
  526. except Exception:
  527. pass
  528. # 关闭 SSE 连接
  529. await unified_sse_manager.close_connection(new_callback_task_id)
  530. return StreamingResponse(
  531. generate_regenerate_events(),
  532. media_type="text/event-stream",
  533. headers={
  534. "Cache-Control": "no-cache",
  535. "Connection": "keep-alive",
  536. "X-Accel-Buffering": "no"
  537. }
  538. )