outline_views.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851
  1. # -*- coding: utf-8 -*-
  2. """
  3. 大纲生成 API 接口 (SSE 版本)
  4. 集成到 Celery + WorkflowManager 架构中
  5. 提供以下接口:
  6. - SSE /sgbx/generating_outline: SSE 流式大纲生成
  7. - SSE /sgbx/regenerate_outline: SSE 流式重新生成
  8. - POST /sgbx/task_cancel: 取消大纲生成任务
  9. """
  10. import uuid
  11. import json
  12. import time
  13. import asyncio
  14. from typing import Optional, Dict, Any, List, AsyncGenerator, Union
  15. from pydantic import BaseModel, Field
  16. from fastapi import APIRouter, HTTPException, Query
  17. from fastapi.responses import StreamingResponse
  18. from foundation.observability.logger.loggering import write_logger as logger
  19. from foundation.infrastructure.tracing import TraceContext, auto_trace
  20. from core.base.workflow_manager import WorkflowManager
  21. from core.base.sse_manager import unified_sse_manager
  22. from core.base.progress_manager import ProgressManager
  23. # 创建路由
  24. outline_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
  25. # 初始化工作流管理器
  26. workflow_manager = WorkflowManager(
  27. max_concurrent_docs=3,
  28. max_concurrent_reviews=5
  29. )
  30. # 初始化进度管理器
  31. progress_manager = ProgressManager()
  32. async def sse_progress_callback(callback_task_id: str, current_data: dict):
  33. """SSE 推送回调函数 - 接收进度更新并推送到客户端"""
  34. await unified_sse_manager.send_progress(callback_task_id, current_data)
  35. def format_sse_event(event_type: str, data: str) -> str:
  36. """格式化 SSE 事件 - 按照 SSE 协议格式化事件数据"""
  37. lines = [
  38. f"event: {event_type}",
  39. f"data: {data}",
  40. "",
  41. ""
  42. ]
  43. return "\n".join(lines) + "\n"
  44. # ==================== 请求/响应模型 ====================
  45. class BaseInfo(BaseModel):
  46. """项目基础信息"""
  47. project_name: str = Field(..., description="方案名称", example="罗成依达大桥上部结构专项施工方案")
  48. construct_location: str = Field(..., description="建设地点", example="四川省凉山州")
  49. engineering_type: str = Field(..., description="方案模版类型", example="T型梁")
  50. class ProjectInfo(BaseModel):
  51. """项目信息(嵌套结构)"""
  52. base_info: BaseInfo = Field(..., description="基础信息")
  53. selectable: Optional[str] = Field("", description="其他可选信息")
  54. class TemplateStructureItem(BaseModel):
  55. """模板结构项(支持嵌套children)"""
  56. index: str = Field(..., description="章节编号", example="2")
  57. level: int = Field(..., description="层级", ge=1, le=5)
  58. title: str = Field(..., description="章节标题", example="工程概况")
  59. code: str = Field(..., description="章节代码", example="overview")
  60. # 使用 Union 支持递归类型
  61. children: Optional[List[Dict[str, Any]]] = Field(None, description="子章节(递归结构)")
  62. class GenerationTemplate(BaseModel):
  63. """大纲生成模板
  64. 示例:
  65. {
  66. "source_file": "方案编写助手原文关键词规范文档修改版-2026-2-5.md",
  67. "alias": "施工方案知识审查与编写体系",
  68. "structure": [
  69. {
  70. "index": "2",
  71. "level": 1,
  72. "title": "工程概况",
  73. "code": "overview",
  74. "children": [...]
  75. }
  76. ]
  77. }
  78. """
  79. source_file: Optional[str] = Field(None, description="源文件", example="方案编写助手原文关键词规范文档修改版-2026-2-5.md")
  80. alias: Optional[str] = Field(None, description="别名", example="施工方案知识审查与编写体系")
  81. structure: List[Union[TemplateStructureItem, Dict[str, Any]]] = Field(..., description="模板结构")
  82. class OutlineGenerationRequest(BaseModel):
  83. """大纲生成请求
  84. 示例请求体(适配curl示例):
  85. {
  86. "user_id": "user-001",
  87. "project_info": {
  88. "base_info": {
  89. "project_name": "罗成依达大桥上部结构专项施工方案",
  90. "construct_location": "四川省凉山州",
  91. "engineering_type": "T型梁"
  92. },
  93. "selectable": ""
  94. },
  95. "generation_template": {
  96. "source_file": "方案编写助手原文关键词规范文档修改版-2026-2-5.md",
  97. "alias": "施工方案知识审查与编写体系",
  98. "structure": [...]
  99. },
  100. "generation_chapterenum": ["overview_DesignSummary_ProjectIntroduction", ...]
  101. }
  102. """
  103. user_id: str = Field(..., description="用户标识", example="user-001")
  104. project_info: ProjectInfo = Field(..., description="项目基础信息")
  105. generation_template: GenerationTemplate = Field(..., description="大纲生成模板")
  106. generation_chapterenum: List[str] = Field(default_factory=list, description="生成章节代码列表,为空时生成全部章节")
  107. class RegenerateOutlineRequest(BaseModel):
  108. """重新生成大纲请求
  109. 复用大纲生成接口的请求定义,额外添加 regenerate_config 字段用于指定重新生成配置。
  110. project_info 和 generation_template 为可选字段,不传入则使用原任务的信息。
  111. 示例请求:
  112. {
  113. "task_id": "task-20250130-123456",
  114. "user_id": "user-001",
  115. "project_info": { // 可选,不传则使用原任务的项目信息
  116. "base_info": {
  117. "project_name": "罗成依达大桥上部结构专项施工方案",
  118. "construct_location": "四川省凉山州",
  119. "engineering_type": "T型梁"
  120. },
  121. "selectable": ""
  122. },
  123. "generation_template": { // 可选,不传则使用原任务的模板
  124. "source_file": "...",
  125. "alias": "...",
  126. "structure": [...]
  127. },
  128. "generation_chapterenum": ["overview_DesignSummary_MainTechnicalStandards"], // 可选
  129. "regenerate_config": {
  130. "regenerate_mode": "chapter",
  131. "target_path": "2.1",
  132. "preserve_children": true,
  133. "reason": "调整内容结构"
  134. }
  135. }
  136. """
  137. task_id: str = Field(..., description="原大纲生成任务ID")
  138. user_id: str = Field(..., description="用户ID")
  139. # 可选:复用大纲生成接口的项目信息(不传则使用原任务的)
  140. project_info: Optional[ProjectInfo] = Field(None, description="项目基础信息(可选)")
  141. # 可选:复用大纲生成接口的模板(不传则使用原任务的)
  142. generation_template: Optional[GenerationTemplate] = Field(None, description="大纲生成模板(可选)")
  143. # 可选:复用大纲生成接口的章节代码列表
  144. generation_chapterenum: Optional[List[str]] = Field(None, description="生成章节代码列表(可选)")
  145. # 重新生成特有的配置
  146. regenerate_config: Dict[str, Any] = Field(..., description="重新生成配置")
  147. class TaskCancelRequest(BaseModel):
  148. """任务取消请求"""
  149. task_id: str = Field(..., description="任务ID")
  150. user_id: str = Field(..., description="用户ID")
  151. cancel_reason: Optional[str] = Field("用户主动取消", description="取消原因")
  152. # ==================== 响应模型 ====================
  153. class OutlineNodeResponse(BaseModel):
  154. """大纲节点响应模型
  155. 与请求的 TemplateStructureItem 对应,增加以下字段:
  156. - 每级节点都包含 generated_content
  157. - 2级和3级节点包含 similar_fragments
  158. - 末级节点额外包含 key_points 和 knowledge_bases
  159. """
  160. index: str = Field(..., description="章节编号", example="2.1.1")
  161. level: int = Field(..., description="层级", ge=1, le=5, example=3)
  162. title: str = Field(..., description="章节标题", example="工程简介")
  163. code: str = Field(..., description="章节代码", example="overview_DesignSummary_ProjectIntroduction")
  164. generated_content: str = Field(..., description="AI生成的内容", example="罗成依达大桥位于四川省凉山州...")
  165. # 2级和3级节点包含 similar_fragments
  166. similar_fragments: Optional[List[Dict[str, Any]]] = Field(None, description="相似片段推荐(2级和3级节点)")
  167. key_points: Optional[List[str]] = Field(None, description="核心要点(仅末级节点)", example=["桥位位置", "桥梁规模"])
  168. knowledge_bases: Optional[List[str]] = Field(None, description="知识点/编制依据(仅末级节点)", example=["《公路桥涵设计通用规范》JTG D60-2015"])
  169. children: Optional[List[Dict[str, Any]]] = Field(None, description="子章节(递归结构)")
  170. class SimilarPlanResponse(BaseModel):
  171. """相似方案响应(整篇方案级推荐)"""
  172. plan_id: str = Field(..., description="方案ID")
  173. plan_title: str = Field(..., description="方案标题")
  174. similarity_score: float = Field(..., description="相似度分数", ge=0.0, le=1.0)
  175. plan_type: str = Field(..., description="方案类型")
  176. outline: Optional[List[Dict]] = Field(None, description="方案大纲结构")
  177. metadata: Optional[Dict[str, Any]] = Field(None, description="元数据")
  178. class SimilarFragmentResponse(BaseModel):
  179. """相似片段响应"""
  180. fragment_id: str = Field(..., description="片段ID")
  181. section_path: str = Field(..., description="所属章节路径")
  182. section_title: str = Field(..., description="章节标题")
  183. fragment_content: str = Field(..., description="片段内容")
  184. similarity_score: float = Field(..., description="相似度分数", ge=0.0, le=1.0)
  185. source_document_id: str = Field(..., description="来源文档ID")
  186. source_document_title: str = Field(..., description="来源文档标题")
  187. class OutlineGenerationResult(BaseModel):
  188. """大纲生成结果
  189. 结构说明:
  190. - outline_structure: 嵌套大纲结构,每个节点包含 generated_content
  191. - 2级和3级节点额外包含 similar_fragments
  192. - 末级节点额外包含 key_points 和 knowledge_bases
  193. - similar_plan: 整篇方案的相似方案推荐(顶层)
  194. """
  195. outline_structure: List[OutlineNodeResponse] = Field(..., description="大纲结构(包含AI生成内容和章节级similar_fragments)")
  196. similar_plan: List[SimilarPlanResponse] = Field(default_factory=list, description="相似方案推荐(整篇方案级)")
  197. # ==================== SSE 接口实现 ====================
  198. @outline_router.post("/generating_outline", response_model=None)
  199. @auto_trace(generate_if_missing=True)
  200. async def generating_outline(request: OutlineGenerationRequest):
  201. """
  202. 大纲生成接口 (SSE 流式响应)
  203. 流程:
  204. 1. 接收请求参数
  205. 2. 生成 callback_task_id
  206. 3. 提交任务到 Celery
  207. 4. 通过 SSE 流式返回进度和结果
  208. Args:
  209. request: 大纲生成请求参数
  210. Returns:
  211. StreamingResponse: SSE 事件流
  212. Response 格式说明:
  213. 完成时返回的 result 包含以下字段:
  214. - outline_structure: 与请求的 generation_template.structure 结构一致
  215. - 每级节点包含 generated_content
  216. - 2级和3级节点额外包含 similar_fragments(章节级相似片段)
  217. - 末级节点额外包含 key_points 和 knowledge_bases
  218. - similar_plan: 整篇方案的相似方案推荐(顶层)
  219. 示例响应结构:
  220. {
  221. "outline_structure": [
  222. {
  223. "index": "2",
  224. "level": 1,
  225. "title": "工程概况",
  226. "code": "overview",
  227. "generated_content": "本章主要介绍...",
  228. "children": [
  229. {
  230. "index": "2.1",
  231. "level": 2,
  232. "title": "设计概况",
  233. "code": "overview_DesignSummary",
  234. "generated_content": "本节介绍...",
  235. "similar_fragments": [...], // 2级节点有相似片段
  236. "children": [
  237. {
  238. "index": "2.1.1",
  239. "level": 3,
  240. "title": "工程简介",
  241. "code": "overview_DesignSummary_ProjectIntroduction",
  242. "generated_content": "罗成依达大桥位于...",
  243. "similar_fragments": [...], // 3级节点有相似片段
  244. "key_points": ["桥位位置", "桥梁规模"],
  245. "knowledge_bases": ["《公路桥涵设计通用规范》"]
  246. }
  247. ]
  248. }
  249. ]
  250. }
  251. ],
  252. "similar_plan": [...] // 整篇方案级相似推荐
  253. }
  254. """
  255. callback_task_id = f"outline_{uuid.uuid4().hex[:16]}"
  256. TraceContext.set_trace_id(callback_task_id)
  257. user_id = request.user_id
  258. logger.info(f"接收大纲生成 SSE 请求: user_id={user_id}, project={request.project_info.base_info.project_name}")
  259. # 使用统一 SSE 管理器建立连接并注册回调
  260. queue = await unified_sse_manager.establish_connection(callback_task_id, sse_progress_callback)
  261. async def generate_outline_events() -> AsyncGenerator[str, None]:
  262. """生成大纲生成 SSE 事件流"""
  263. try:
  264. # 发送连接确认事件
  265. connected_data = json.dumps({
  266. "callback_task_id": callback_task_id,
  267. "user_id": user_id,
  268. "current": 0,
  269. "stage_name": "连接建立",
  270. "status": "connected",
  271. "message": "SSE 连接已建立,正在启动大纲生成任务...",
  272. "overall_task_status": "processing",
  273. "updated_at": int(time.time())
  274. }, ensure_ascii=False)
  275. yield format_sse_event("connected", connected_data)
  276. # 构建任务信息(适配新的请求格式)
  277. # 提取项目基础信息
  278. base_info = request.project_info.base_info
  279. project_info_flat = {
  280. "project_name": base_info.project_name,
  281. "construct_location": base_info.construct_location,
  282. "engineering_type": base_info.engineering_type,
  283. "selectable": request.project_info.selectable or ""
  284. }
  285. # 构建任务信息
  286. sgbx_task_info = {
  287. "callback_task_id": callback_task_id,
  288. "user_id": user_id,
  289. "project_info": project_info_flat,
  290. #"generation_template": request.generation_template.dict(),
  291. "template_id": request.generation_template.alias or "default_template",
  292. "generation_chapterenum": request.generation_chapterenum,
  293. "generation_template": [
  294. item.dict() if isinstance(item, TemplateStructureItem) else item
  295. for item in request.generation_template.structure
  296. ],
  297. "similarity_config": {
  298. "topk_plans": 3,
  299. "topk_fragments": 10,
  300. "threshold": 0.75
  301. },
  302. "knowledge_config": {
  303. "topk": 3,
  304. "threshold": 0.75
  305. },
  306. }
  307. # 发送处理中事件
  308. processing_data = json.dumps({
  309. "callback_task_id": callback_task_id,
  310. "user_id": user_id,
  311. "current": 5,
  312. "stage_name": "任务提交中",
  313. "status": "processing",
  314. "message": "正在提交大纲生成任务...",
  315. "overall_task_status": "processing",
  316. "updated_at": int(time.time())
  317. }, ensure_ascii=False)
  318. yield format_sse_event("processing", processing_data)
  319. # 提交任务到 Celery
  320. celery_task_id = await workflow_manager.submit_outline_generation_task(sgbx_task_info)
  321. logger.info(f"大纲生成任务已提交: callback_task_id={callback_task_id}, celery_task_id={celery_task_id}")
  322. # 发送任务提交成功事件
  323. submitted_data = json.dumps({
  324. "callback_task_id": callback_task_id,
  325. "user_id": user_id,
  326. "current": 10,
  327. "stage_name": "任务已提交",
  328. "status": "submitted",
  329. "message": "大纲生成任务已提交,正在执行...",
  330. "overall_task_status": "processing",
  331. "updated_at": int(time.time()),
  332. "celery_task_id": celery_task_id
  333. }, ensure_ascii=False)
  334. yield format_sse_event("submitted", submitted_data)
  335. # 持续监听进度并转发(从 Redis 轮询,支持跨进程)
  336. last_progress = 10
  337. last_progress_data = None
  338. no_change_count = 0
  339. while True:
  340. try:
  341. # 从 Redis 获取最新进度(支持跨进程)
  342. progress_data = await progress_manager.get_progress(callback_task_id)
  343. if progress_data:
  344. current_progress = progress_data.get("current", last_progress)
  345. # 进度有变化或状态变化时推送
  346. if (current_progress != last_progress or
  347. progress_data.get("overall_task_status") != last_progress_data.get("overall_task_status") if last_progress_data else True):
  348. last_progress = current_progress
  349. last_progress_data = progress_data
  350. # 转发进度事件
  351. yield format_sse_event("processing", json.dumps(progress_data, ensure_ascii=False))
  352. no_change_count = 0
  353. else:
  354. no_change_count += 1
  355. # 检查任务是否完成
  356. if progress_data.get("overall_task_status") in ["completed", "failed", "terminated"]:
  357. break
  358. await asyncio.sleep(0.5)
  359. # 每 6 秒发送一次心跳(无进度变化时,6秒 = 30 * 200ms)
  360. if no_change_count >= 30:
  361. heartbeat_data = json.dumps({
  362. "callback_task_id": callback_task_id,
  363. "user_id": user_id,
  364. "current": last_progress,
  365. "stage_name": "执行中",
  366. "status": "processing",
  367. "message": "大纲生成任务正在执行中...",
  368. "overall_task_status": "processing",
  369. "updated_at": int(time.time())
  370. }, ensure_ascii=False)
  371. yield format_sse_event("heartbeat", heartbeat_data)
  372. no_change_count = 0
  373. except Exception as e:
  374. logger.warning(f"轮询进度异常: {callback_task_id}, 错误: {str(e)}")
  375. await asyncio.sleep(0.5)
  376. # 获取最终结果
  377. final_result = await workflow_manager.get_outline_sgbx_task_info(callback_task_id)
  378. if final_result and final_result.get("status") == "completed":
  379. # 发送完成事件
  380. # 新的响应格式:
  381. # - outline_structure: 嵌套结构,每个节点包含 generated_content
  382. # - 2级和3级节点额外包含 similar_fragments
  383. # - 末级节点额外包含 key_points 和 knowledge_bases
  384. # - similar_plan: 整篇方案的相似方案推荐(顶层)
  385. completed_data = json.dumps({
  386. "callback_task_id": callback_task_id,
  387. "user_id": user_id,
  388. "current": 100,
  389. "stage_name": "大纲生成完成",
  390. "status": "completed",
  391. "message": "大纲生成任务已完成",
  392. "overall_task_status": "completed",
  393. "updated_at": int(time.time()),
  394. "result": {
  395. "outline_structure": final_result.get("results", {}).get("outline_structure", []),
  396. "similar_plan": final_result.get("results", {}).get("similar_plan", [])
  397. }
  398. }, ensure_ascii=False)
  399. yield format_sse_event("completed", completed_data)
  400. else:
  401. # 发送失败事件
  402. failed_data = json.dumps({
  403. "callback_task_id": callback_task_id,
  404. "user_id": user_id,
  405. "current": last_progress,
  406. "stage_name": "任务失败",
  407. "status": "failed",
  408. "message": final_result.get("results", {}).get("error", "大纲生成任务失败") if final_result else "任务执行失败",
  409. "overall_task_status": "failed",
  410. "updated_at": int(time.time())
  411. }, ensure_ascii=False)
  412. yield format_sse_event("failed", failed_data)
  413. except Exception as e:
  414. logger.error(f"大纲生成 SSE 事件流错误: {str(e)}", exc_info=True)
  415. # 发送错误事件
  416. error_data = json.dumps({
  417. "callback_task_id": callback_task_id,
  418. "user_id": user_id,
  419. "current": 0,
  420. "stage_name": "系统错误",
  421. "status": "error",
  422. "message": f"系统错误: {str(e)}",
  423. "overall_task_status": "failed",
  424. "updated_at": int(time.time())
  425. }, ensure_ascii=False)
  426. yield format_sse_event("error", error_data)
  427. finally:
  428. # 关闭 SSE 连接
  429. await unified_sse_manager.close_connection(callback_task_id)
  430. return StreamingResponse(
  431. generate_outline_events(),
  432. media_type="text/event-stream",
  433. headers={
  434. "Cache-Control": "no-cache",
  435. "Connection": "keep-alive",
  436. "X-Accel-Buffering": "no"
  437. }
  438. )
  439. @outline_router.post("/regenerate_outline", response_model=None)
  440. @auto_trace(generate_if_missing=True)
  441. async def regenerate_outline(request: RegenerateOutlineRequest):
  442. """
  443. 重新生成大纲接口 (SSE 流式响应)
  444. 按章或按节重新生成大纲内容。
  445. 复用大纲生成接口的请求定义,project_info 和 generation_template 为可选字段,
  446. 不传入则使用原任务的信息。
  447. Args:
  448. request: 重新生成请求参数,包含:
  449. - task_id: 原任务ID
  450. - user_id: 用户ID
  451. - project_info: 可选,项目基础信息(不传则使用原任务的)
  452. - generation_template: 可选,大纲生成模板(不传则使用原任务的)
  453. - regenerate_config: 重新生成配置
  454. Returns:
  455. StreamingResponse: SSE 事件流
  456. 示例请求:
  457. {
  458. "task_id": "task-20250130-123456",
  459. "user_id": "user-001",
  460. "generation_chapterenum": ["overview_DesignSummary_MainTechnicalStandards"],
  461. "regenerate_config": {
  462. "regenerate_mode": "chapter",
  463. "target_path": "2.1",
  464. "preserve_children": true,
  465. "reason": "调整内容结构"
  466. }
  467. }
  468. """
  469. callback_task_id = request.task_id
  470. TraceContext.set_trace_id(callback_task_id)
  471. user_id = request.user_id
  472. regenerate_config = request.regenerate_config
  473. logger.info(f"接收重新生成大纲 SSE 请求: task_id={callback_task_id}, user_id={user_id}, "
  474. f"target={regenerate_config.get('target_path', 'unknown')}")
  475. # 使用统一 SSE 管理器建立连接
  476. queue = await unified_sse_manager.establish_connection(callback_task_id, sse_progress_callback)
  477. async def generate_regenerate_events() -> AsyncGenerator[str, None]:
  478. """生成重新生成 SSE 事件流"""
  479. try:
  480. # 发送连接确认事件
  481. connected_data = json.dumps({
  482. "callback_task_id": callback_task_id,
  483. "user_id": user_id,
  484. "current": 0,
  485. "stage_name": "连接建立",
  486. "status": "connected",
  487. "message": "SSE 连接已建立,正在启动重新生成任务...",
  488. "overall_task_status": "processing",
  489. "updated_at": int(time.time())
  490. }, ensure_ascii=False)
  491. yield format_sse_event("connected", connected_data)
  492. # 获取原任务信息
  493. original_task = await workflow_manager.get_outline_sgbx_task_info(callback_task_id)
  494. if not original_task:
  495. # 发送错误事件
  496. error_data = json.dumps({
  497. "callback_task_id": callback_task_id,
  498. "user_id": user_id,
  499. "current": 0,
  500. "stage_name": "任务不存在",
  501. "status": "error",
  502. "message": "原任务不存在或已过期",
  503. "overall_task_status": "failed",
  504. "updated_at": int(time.time())
  505. }, ensure_ascii=False)
  506. yield format_sse_event("error", error_data)
  507. return
  508. # 发送处理中事件
  509. processing_data = json.dumps({
  510. "callback_task_id": callback_task_id,
  511. "user_id": user_id,
  512. "current": 5,
  513. "stage_name": "准备重新生成",
  514. "status": "processing",
  515. "message": f"正在准备重新生成: {regenerate_config.get('target_path', 'unknown')}...",
  516. "overall_task_status": "processing",
  517. "updated_at": int(time.time())
  518. }, ensure_ascii=False)
  519. yield format_sse_event("processing", processing_data)
  520. # 构建任务信息
  521. # 优先使用传入的 project_info 和 generation_template,否则使用原任务的
  522. if request.project_info:
  523. base_info = request.project_info.base_info
  524. project_info_flat = {
  525. "project_name": base_info.project_name,
  526. "construct_location": base_info.construct_location,
  527. "engineering_type": base_info.engineering_type,
  528. "selectable": request.project_info.selectable or ""
  529. }
  530. else:
  531. # 使用原任务的项目信息
  532. project_info_flat = original_task.get("project_info", {})
  533. # 处理 generation_template 和 generation_chapterenum
  534. generation_template = request.generation_template
  535. if generation_template:
  536. template_data = generation_template.dict()
  537. outline_structure = template_data.get("structure", [])
  538. else:
  539. # 使用原任务的模板信息
  540. template_data = original_task.get("generation_template", {})
  541. outline_structure = original_task.get("outline_structure", [])
  542. # 优先使用传入的 generation_chapterenum,否则使用原任务的
  543. generation_chapterenum = request.generation_chapterenum if request.generation_chapterenum is not None else original_task.get("generation_chapterenum", [])
  544. # 构建任务信息(与大纲生成接口保持一致)
  545. sgbx_task_info = {
  546. "callback_task_id": callback_task_id,
  547. "user_id": user_id,
  548. "project_info": project_info_flat,
  549. "template_id": template_data.get("alias", "default_template") if template_data else "default_template",
  550. "generation_chapterenum": generation_chapterenum,
  551. "generation_template": outline_structure if outline_structure else template_data.get("structure", []),
  552. "similarity_config": {
  553. "topk_plans": 3,
  554. "topk_fragments": 10,
  555. "threshold": 0.75
  556. },
  557. "knowledge_config": {
  558. "topk": 3,
  559. "threshold": 0.75
  560. },
  561. # 重新生成特有字段
  562. "regenerate_config": regenerate_config,
  563. "is_regenerate": True
  564. }
  565. logger.info(f"重新生成任务信息构建完成: task_id={callback_task_id}, "
  566. f"target={regenerate_config.get('target_path', 'unknown')}, "
  567. f"chapters={generation_chapterenum}")
  568. # 模拟重新生成过程
  569. for progress in [20, 40, 60, 80]:
  570. await asyncio.sleep(0.5) # 模拟处理时间
  571. progress_data = json.dumps({
  572. "callback_task_id": callback_task_id,
  573. "user_id": user_id,
  574. "current": progress,
  575. "stage_name": "重新生成中",
  576. "status": "processing",
  577. "message": f"正在重新生成章节: {regenerate_config.get('target_path', 'unknown')}...",
  578. "overall_task_status": "processing",
  579. "updated_at": int(time.time())
  580. }, ensure_ascii=False)
  581. yield format_sse_event("processing", progress_data)
  582. # 发送完成事件
  583. # 响应格式与大纲生成接口保持一致
  584. completed_data = json.dumps({
  585. "callback_task_id": callback_task_id,
  586. "user_id": user_id,
  587. "current": 100,
  588. "stage_name": "重新生成完成",
  589. "status": "completed",
  590. "message": "大纲重新生成已完成",
  591. "overall_task_status": "completed",
  592. "updated_at": int(time.time()),
  593. "result": {
  594. # 复用大纲生成接口的响应格式
  595. # outline_structure: 仅包含重新生成的章节及其子章节
  596. # 结构与大纲生成接口一致,包含 generated_content/similar_fragments/key_points/knowledge_bases
  597. "outline_structure": [
  598. {
  599. "index": regenerate_config.get("target_path", "unknown"),
  600. "level": len(regenerate_config.get("target_path", "").split(".")),
  601. "title": f"重新生成的章节 {regenerate_config.get('target_path', 'unknown')}",
  602. "code": generation_chapterenum[0] if generation_chapterenum else "",
  603. "generated_content": "重新生成的内容...",
  604. "similar_fragments": [], # 2级/3级节点包含
  605. "key_points": ["要点1", "要点2"], # 末级节点包含
  606. "knowledge_bases": ["规范1", "规范2"] # 末级节点包含
  607. }
  608. ],
  609. "similar_plan": [] # 整篇方案级相似推荐
  610. }
  611. }, ensure_ascii=False)
  612. yield format_sse_event("completed", completed_data)
  613. except Exception as e:
  614. logger.error(f"重新生成大纲 SSE 事件流错误: {str(e)}", exc_info=True)
  615. # 发送错误事件
  616. error_data = json.dumps({
  617. "callback_task_id": callback_task_id,
  618. "user_id": user_id,
  619. "current": 0,
  620. "stage_name": "系统错误",
  621. "status": "error",
  622. "message": f"系统错误: {str(e)}",
  623. "overall_task_status": "failed",
  624. "updated_at": int(time.time())
  625. }, ensure_ascii=False)
  626. yield format_sse_event("error", error_data)
  627. finally:
  628. # 关闭 SSE 连接
  629. await unified_sse_manager.close_connection(callback_task_id)
  630. return StreamingResponse(
  631. generate_regenerate_events(),
  632. media_type="text/event-stream",
  633. headers={
  634. "Cache-Control": "no-cache",
  635. "Connection": "keep-alive",
  636. "X-Accel-Buffering": "no"
  637. }
  638. )
  639. # ==================== POST 接口 ====================
  640. @outline_router.post("/task_cancel")
  641. @auto_trace(generate_if_missing=True)
  642. async def task_cancel(request: TaskCancelRequest):
  643. """
  644. 取消大纲生成任务
  645. 设置任务终止信号,任务将在当前节点完成后终止
  646. Args:
  647. request: 任务取消请求参数
  648. Returns:
  649. dict: 操作结果
  650. """
  651. try:
  652. logger.info(f"接收取消任务请求: task_id={request.task_id}, user_id={request.user_id}")
  653. # 设置终止信号
  654. result = await workflow_manager.set_outline_terminate_signal(
  655. callback_task_id=request.task_id,
  656. operator=request.user_id
  657. )
  658. return {
  659. "code": 200 if result["success"] else 400,
  660. "message": result["message"],
  661. "data": result.get("sgbx_task_info")
  662. }
  663. except Exception as e:
  664. logger.error(f"取消任务失败: {str(e)}", exc_info=True)
  665. raise HTTPException(status_code=500, detail=f"取消任务失败: {str(e)}")
  666. # ==================== 查询接口 ====================
  667. @outline_router.get("/task_status")
  668. @auto_trace(generate_if_missing=True)
  669. async def task_status(
  670. task_id: str = Query(..., description="任务ID"),
  671. user_id: str = Query(..., description="用户ID")
  672. ):
  673. """
  674. 查询大纲生成任务状态
  675. Args:
  676. task_id: 任务回调ID
  677. user_id: 用户ID
  678. Returns:
  679. dict: 任务状态信息
  680. """
  681. try:
  682. logger.info(f"查询任务状态: task_id={task_id}")
  683. # 获取任务信息
  684. sgbx_task_info = await workflow_manager.get_outline_sgbx_task_info(task_id)
  685. if sgbx_task_info is None:
  686. return {
  687. "code": 404,
  688. "message": "任务不存在或已完成",
  689. "data": None
  690. }
  691. return {
  692. "code": 200,
  693. "message": "查询成功",
  694. "data": sgbx_task_info
  695. }
  696. except Exception as e:
  697. logger.error(f"查询任务状态失败: {str(e)}", exc_info=True)
  698. raise HTTPException(status_code=500, detail=f"查询任务状态失败: {str(e)}")
  699. @outline_router.get("/active_tasks")
  700. @auto_trace(generate_if_missing=True)
  701. async def active_tasks(
  702. user_id: str = Query(None, description="用户ID(可选,不提供则返回所有任务)")
  703. ):
  704. """
  705. 获取活跃的大纲生成任务列表
  706. Args:
  707. user_id: 用户ID(可选)
  708. Returns:
  709. dict: 活跃任务列表
  710. """
  711. try:
  712. logger.info(f"获取活跃任务列表: user_id={user_id}")
  713. # 获取所有活跃任务
  714. active_tasks_list = await workflow_manager.get_outline_active_tasks()
  715. # 如果指定了用户ID,则过滤
  716. if user_id:
  717. active_tasks_list = [task for task in active_tasks_list if task["user_id"] == user_id]
  718. return {
  719. "code": 200,
  720. "message": "查询成功",
  721. "data": {
  722. "total": len(active_tasks_list),
  723. "tasks": active_tasks_list
  724. }
  725. }
  726. except Exception as e:
  727. logger.error(f"获取活跃任务列表失败: {str(e)}", exc_info=True)
  728. raise HTTPException(status_code=500, detail=f"获取活跃任务列表失败: {str(e)}")