outline_views.py 76 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755
  1. # -*- coding: utf-8 -*-
  2. """
  3. 大纲生成 API 接口 (SSE 版本)
  4. 集成到 Celery + WorkflowManager 架构中
  5. 提供以下接口:
  6. - SSE /sgbx/generating_outline: SSE 流式大纲生成
  7. - SSE /sgbx/regenerate_outline: SSE 流式重新生成
  8. - POST /sgbx/task_cancel: 取消大纲生成任务
  9. - POST /sgbx/context_generate: SSE 流式上下文生成 (新增)
  10. """
  11. import uuid
  12. import json
  13. import time
  14. import asyncio
  15. import aiohttp
  16. from typing import Optional, Dict, Any, List, AsyncGenerator, Union
  17. from pydantic import BaseModel, Field
  18. from fastapi import APIRouter, HTTPException, Query
  19. from fastapi.responses import StreamingResponse
  20. from foundation.observability.logger.loggering import write_logger as logger
  21. from foundation.infrastructure.tracing import TraceContext, auto_trace
  22. from foundation.infrastructure.config.config import config_handler
  23. from core.base.workflow_manager import WorkflowManager
  24. from core.base.sse_manager import unified_sse_manager
  25. from core.base.progress_manager import ProgressManager
  26. from redis.asyncio import Redis as AsyncRedis
  27. # 创建路由
  28. outline_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
  29. # 初始化工作流管理器
  30. workflow_manager = WorkflowManager(
  31. max_concurrent_docs=3,
  32. max_concurrent_reviews=5
  33. )
  34. # 初始化进度管理器
  35. progress_manager = ProgressManager()
  36. async def sse_progress_callback(callback_task_id: str, current_data: dict):
  37. """SSE 推送回调函数 - 接收进度更新并推送到客户端"""
  38. await unified_sse_manager.send_progress(callback_task_id, current_data)
  39. def format_sse_event(event_type: str, data: str) -> str:
  40. """格式化 SSE 事件 - 按照 SSE 协议格式化事件数据"""
  41. lines = [
  42. f"event: {event_type}",
  43. f"data: {data}",
  44. "",
  45. ""
  46. ]
  47. return "\n".join(lines) + "\n"
  48. # ==================== 请求/响应模型 ====================
  49. class BaseInfo(BaseModel):
  50. """项目基础信息"""
  51. project_name: str = Field(..., description="方案名称", example="罗成依达大桥上部结构专项施工方案")
  52. construct_location: str = Field(..., description="建设地点", example="四川省凉山州")
  53. engineering_type: str = Field(..., description="方案模版类型", example="T型梁")
  54. class ProjectInfo(BaseModel):
  55. """项目信息(嵌套结构)"""
  56. base_info: BaseInfo = Field(..., description="基础信息")
  57. selectable: Optional[str] = Field("", description="其他可选信息")
  58. class TemplateStructureItem(BaseModel):
  59. """模板结构项(支持嵌套children)"""
  60. index: str = Field(..., description="章节编号", example="2")
  61. level: int = Field(..., description="层级", ge=1, le=5)
  62. title: str = Field(..., description="章节标题", example="工程概况")
  63. code: str = Field(..., description="章节代码", example="overview")
  64. # 使用 Union 支持递归类型
  65. children: Optional[List[Dict[str, Any]]] = Field(None, description="子章节(递归结构)")
  66. class GenerationTemplate(BaseModel):
  67. """大纲生成模板
  68. 示例:
  69. {
  70. "source_file": "方案编写助手原文关键词规范文档修改版-2026-2-5.md",
  71. "alias": "施工方案知识审查与编写体系",
  72. "structure": [
  73. {
  74. "index": "2",
  75. "level": 1,
  76. "title": "工程概况",
  77. "code": "overview",
  78. "children": [...]
  79. }
  80. ]
  81. }
  82. """
  83. source_file: Optional[str] = Field(None, description="源文件", example="方案编写助手原文关键词规范文档修改版-2026-2-5.md")
  84. alias: Optional[str] = Field(None, description="别名", example="施工方案知识审查与编写体系")
  85. structure: List[Union[TemplateStructureItem, Dict[str, Any]]] = Field(..., description="模板结构")
  86. class OutlineGenerationRequest(BaseModel):
  87. """大纲生成请求
  88. 示例请求体(适配curl示例):
  89. {
  90. "user_id": "user-001",
  91. "project_info": {
  92. "base_info": {
  93. "project_name": "罗成依达大桥上部结构专项施工方案",
  94. "construct_location": "四川省凉山州",
  95. "engineering_type": "T型梁"
  96. },
  97. "selectable": ""
  98. },
  99. "generation_template": {
  100. "source_file": "方案编写助手原文关键词规范文档修改版-2026-2-5.md",
  101. "alias": "施工方案知识审查与编写体系",
  102. "structure": [...]
  103. },
  104. "generation_chapterenum": ["overview_DesignSummary_ProjectIntroduction", ...]
  105. }
  106. """
  107. user_id: str = Field(..., description="用户标识", example="user-001")
  108. project_info: ProjectInfo = Field(..., description="项目基础信息")
  109. generation_template: GenerationTemplate = Field(..., description="大纲生成模板")
  110. generation_chapterenum: List[str] = Field(default_factory=list, description="生成章节代码列表,为空时生成全部章节")
  111. class RegenerateOutlineRequest(BaseModel):
  112. """重新生成大纲请求
  113. 复用大纲生成接口的请求定义,额外添加 regenerate_config 字段用于指定重新生成配置。
  114. project_info 和 generation_template 为可选字段,不传入则使用原任务的信息。
  115. 示例请求:
  116. {
  117. "task_id": "task-20250130-123456",
  118. "user_id": "user-001",
  119. "project_info": { // 可选,不传则使用原任务的项目信息
  120. "base_info": {
  121. "project_name": "罗成依达大桥上部结构专项施工方案",
  122. "construct_location": "四川省凉山州",
  123. "engineering_type": "T型梁"
  124. },
  125. "selectable": ""
  126. },
  127. "generation_template": { // 可选,不传则使用原任务的模板
  128. "source_file": "...",
  129. "alias": "...",
  130. "structure": [...]
  131. },
  132. "generation_chapterenum": ["overview_DesignSummary_MainTechnicalStandards"], // 可选
  133. "regenerate_config": {
  134. "regenerate_mode": "chapter",
  135. "target_path": "2.1",
  136. "preserve_children": true,
  137. "reason": "调整内容结构"
  138. }
  139. }
  140. """
  141. task_id: str = Field(..., description="原大纲生成任务ID")
  142. user_id: str = Field(..., description="用户ID")
  143. # 可选:复用大纲生成接口的项目信息(不传则使用原任务的)
  144. project_info: Optional[ProjectInfo] = Field(None, description="项目基础信息(可选)")
  145. # 可选:复用大纲生成接口的模板(不传则使用原任务的)
  146. generation_template: Optional[GenerationTemplate] = Field(None, description="大纲生成模板(可选)")
  147. # 可选:复用大纲生成接口的章节代码列表
  148. generation_chapterenum: Optional[List[str]] = Field(None, description="生成章节代码列表(可选)")
  149. # 重新生成特有的配置
  150. regenerate_config: Dict[str, Any] = Field(..., description="重新生成配置")
  151. class TaskCancelRequest(BaseModel):
  152. """任务取消请求"""
  153. task_id: str = Field(..., description="任务ID")
  154. user_id: str = Field(..., description="用户ID")
  155. cancel_reason: Optional[str] = Field("用户主动取消", description="取消原因")
  156. # ==================== 响应模型 ====================
  157. class OutlineNodeResponse(BaseModel):
  158. """大纲节点响应模型
  159. 与请求的 TemplateStructureItem 对应,增加以下字段:
  160. - 每级节点都包含 generated_content
  161. - 2级和3级节点包含 similar_fragments
  162. - 末级节点额外包含 key_points 和 knowledge_bases
  163. """
  164. index: str = Field(..., description="章节编号", example="2.1.1")
  165. level: int = Field(..., description="层级", ge=1, le=5, example=3)
  166. title: str = Field(..., description="章节标题", example="工程简介")
  167. code: str = Field(..., description="章节代码", example="overview_DesignSummary_ProjectIntroduction")
  168. generated_content: str = Field(..., description="AI生成的内容", example="罗成依达大桥位于四川省凉山州...")
  169. # 2级和3级节点包含 similar_fragments
  170. similar_fragments: Optional[List[Dict[str, Any]]] = Field(None, description="相似片段推荐(2级和3级节点)")
  171. key_points: Optional[List[str]] = Field(None, description="核心要点(仅末级节点)", example=["桥位位置", "桥梁规模"])
  172. knowledge_bases: Optional[List[str]] = Field(None, description="知识点/编制依据(仅末级节点)", example=["《公路桥涵设计通用规范》JTG D60-2015"])
  173. children: Optional[List[Dict[str, Any]]] = Field(None, description="子章节(递归结构)")
  174. class SimilarPlanResponse(BaseModel):
  175. """相似方案响应(整篇方案级推荐)"""
  176. plan_id: str = Field(..., description="方案ID")
  177. plan_title: str = Field(..., description="方案标题")
  178. similarity_score: float = Field(..., description="相似度分数", ge=0.0, le=1.0)
  179. plan_type: str = Field(..., description="方案类型")
  180. outline: Optional[List[Dict]] = Field(None, description="方案大纲结构")
  181. metadata: Optional[Dict[str, Any]] = Field(None, description="元数据")
  182. class SimilarFragmentResponse(BaseModel):
  183. """相似片段响应"""
  184. fragment_id: str = Field(..., description="片段ID")
  185. section_path: str = Field(..., description="所属章节路径")
  186. section_title: str = Field(..., description="章节标题")
  187. fragment_content: str = Field(..., description="片段内容")
  188. similarity_score: float = Field(..., description="相似度分数", ge=0.0, le=1.0)
  189. source_document_id: str = Field(..., description="来源文档ID")
  190. source_document_title: str = Field(..., description="来源文档标题")
  191. class OutlineGenerationResult(BaseModel):
  192. """大纲生成结果
  193. 结构说明:
  194. - outline_structure: 嵌套大纲结构,每个节点包含 generated_content
  195. - 2级和3级节点额外包含 similar_fragments
  196. - 末级节点额外包含 key_points 和 knowledge_bases
  197. - similar_plan: 整篇方案的相似方案推荐(顶层)
  198. """
  199. outline_structure: List[OutlineNodeResponse] = Field(..., description="大纲结构(包含AI生成内容和章节级similar_fragments)")
  200. similar_plan: List[SimilarPlanResponse] = Field(default_factory=list, description="相似方案推荐(整篇方案级)")
  201. # ==================== 上下文生成新增模型 ====================
  202. class CompletionConfig(BaseModel):
  203. section_path: str = Field(..., description="章节路径")
  204. current_content: str = Field(default="", description="当前已有内容")
  205. context_window: int = Field(default=2000, ge=500, le=5000)
  206. completion_mode: str = Field(default="continue", description="模式")
  207. target_length: int = Field(default=1000, ge=100, le=5000)
  208. include_references: bool = Field(default=True)
  209. style_match: bool = Field(default=True)
  210. hint_keywords: Optional[List[str]] = Field(default=None)
  211. class ProjectInfoSimple(BaseModel):
  212. project_name: str = Field(default="施工方案")
  213. construct_location: Optional[str] = Field(default=None)
  214. engineering_type: Optional[str] = Field(default=None)
  215. class ContextGenerateRequest(BaseModel):
  216. task_id: Optional[str] = Field(default=None)
  217. user_id: str = Field(...)
  218. project_info: Optional[ProjectInfoSimple] = Field(default=None)
  219. completion_config: CompletionConfig = Field(...)
  220. model_name: Optional[str] = Field(default=None)
  221. class Config: extra = "forbid"
  222. class ContextGenerateResponse(BaseModel):
  223. code: int
  224. message: str
  225. data: Optional[Dict[str, Any]] = None
  226. # ==================== 全局资源池 (速度优化核心) ====================
  227. GLOBAL_HTTP_SESSION: Optional[aiohttp.ClientSession] = None
  228. async def init_global_resources():
  229. """初始化全局连接池"""
  230. global GLOBAL_HTTP_SESSION, GLOBAL_REDIS_CLIENT
  231. if GLOBAL_HTTP_SESSION is None or GLOBAL_HTTP_SESSION.closed:
  232. # 增加 DNS 缓存和连接复用,针对阿里云域名优化
  233. connector = aiohttp.TCPConnector(limit=100, limit_per_host=20, ttl_dns_cache=300, force_close=False)
  234. GLOBAL_HTTP_SESSION = aiohttp.ClientSession(
  235. timeout=aiohttp.ClientTimeout(total=120, connect=10, sock_read=10), # 连接超时稍长以防网络波动
  236. connector=connector,
  237. headers={"User-Agent": "FastAPI-DashScope-Optimized/2.0"}
  238. )
  239. logger.info("✅ 全局 HTTP 连接池已初始化 (DashScope Ready)")
  240. if GLOBAL_REDIS_CLIENT is None:
  241. try:
  242. GLOBAL_REDIS_CLIENT = AsyncRedis(
  243. host='127.0.0.1', port=6379, password='123456', db=0,
  244. decode_responses=True, socket_connect_timeout=1,
  245. socket_keepalive=True, max_connections=50
  246. )
  247. asyncio.create_task(_background_ping())
  248. logger.info("✅ 全局 Redis 连接池已初始化")
  249. except Exception as e:
  250. logger.warning(f"⚠️ Redis 初始化失败: {e}")
  251. GLOBAL_REDIS_CLIENT = None
  252. async def _background_ping():
  253. if GLOBAL_REDIS_CLIENT:
  254. try: await GLOBAL_REDIS_CLIENT.ping()
  255. except: pass
  256. async def get_http_session():
  257. if GLOBAL_HTTP_SESSION is None or GLOBAL_HTTP_SESSION.closed:
  258. await init_global_resources()
  259. return GLOBAL_HTTP_SESSION
  260. async def get_redis_client():
  261. if GLOBAL_REDIS_CLIENT is None:
  262. await init_global_resources()
  263. return GLOBAL_REDIS_CLIENT
  264. # ==================== 自定义 API 配置 (阿里云 DashScope) ====================
  265. class CustomAPIConfig:
  266. # 【关键修改】阿里云 DashScope 兼容模式地址
  267. # 注意:必须包含 /chat/completions 后缀
  268. DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
  269. DASHSCOPE_CHAT_URL = f"{DASHSCOPE_BASE_URL}/chat/completions"
  270. # 【关键修改】您的 API Key
  271. DASHSCOPE_API_KEY = "sk-ae805c991b6a4a8da3a09351c34963a5"
  272. # 【关键修改】目标模型
  273. DEFAULT_MODEL_NAME = "qwen3-30b-a3b-instruct-2507"
  274. @staticmethod
  275. def get_api_url() -> str:
  276. # 优先使用硬编码的阿里云地址
  277. return CustomAPIConfig.DASHSCOPE_CHAT_URL
  278. @staticmethod
  279. def get_api_key() -> str:
  280. return CustomAPIConfig.DASHSCOPE_API_KEY
  281. @staticmethod
  282. def get_model_name() -> str:
  283. # 允许配置覆盖,否则使用默认
  284. configured_model = config_handler.get("custom_api", "MODEL_NAME", "")
  285. return configured_model if configured_model else CustomAPIConfig.DEFAULT_MODEL_NAME
  286. @staticmethod
  287. def is_enabled() -> bool:
  288. # 只要 Key 不为空即启用
  289. return bool(CustomAPIConfig.get_api_key()) and bool(CustomAPIConfig.get_api_url())
  290. # ==================== 极速流式调用 (核心优化) ====================
  291. async def call_custom_api_stream(
  292. prompt: str, system_prompt: str = "", max_tokens: int = 2000,
  293. temperature: float = 0.7, trace_id: str = ""
  294. ) -> AsyncGenerator[tuple[str, Optional[float]], None]:
  295. api_url = CustomAPIConfig.get_api_url()
  296. model_name = CustomAPIConfig.get_model_name()
  297. api_key = CustomAPIConfig.get_api_key()
  298. logger.debug(f"[{trace_id}] 正在调用阿里云 DashScope: {model_name} @ {api_url}")
  299. # 截断过长的 Prompt (阿里云对输入长度有限制,且为了速度)
  300. max_prompt_len = 10000
  301. if len(prompt) > max_prompt_len:
  302. prompt = prompt[-max_prompt_len:]
  303. logger.debug(f"[{trace_id}] Prompt 已截断至 {max_prompt_len} 字符")
  304. payload = {
  305. "model": model_name,
  306. "messages": [
  307. {"role": "system", "content": system_prompt},
  308. {"role": "user", "content": prompt}
  309. ],
  310. "max_tokens": max_tokens,
  311. "temperature": temperature,
  312. "stream": True,
  313. "incremental_output": True # 阿里云兼容模式可能支持此参数,优化流式体验
  314. }
  315. headers = {
  316. "Content-Type": "application/json",
  317. "Authorization": f"Bearer {api_key}"
  318. }
  319. start_time = time.time()
  320. first_token_time: Optional[float] = None
  321. buffer = ""
  322. session = await get_http_session()
  323. try:
  324. # 阿里云 HTTPS 连接,保持 read_bufsize=1 以获取最快首字
  325. async with session.post(api_url, json=payload, headers=headers, read_bufsize=1) as response:
  326. if response.status != 200:
  327. error_text = await response.text()
  328. logger.error(f"[{trace_id}] API 错误 {response.status}: {error_text}")
  329. raise Exception(f"API 错误 {response.status}: {error_text}")
  330. async for chunk in response.content.iter_any():
  331. if not chunk: continue
  332. try:
  333. text = chunk.decode('utf-8', errors='ignore')
  334. if not text: continue
  335. buffer += text
  336. while '\n' in buffer:
  337. line, buffer = buffer.split('\n', 1)
  338. line = line.strip()
  339. if line.startswith('data: '):
  340. data = line[6:]
  341. if data == '[DONE]':
  342. return
  343. try:
  344. event_data = json.loads(data)
  345. # 处理阿里云可能的错误格式
  346. if "error" in event_data:
  347. err_msg = event_data["error"].get("message", "Unknown Error")
  348. logger.error(f"[{trace_id}] 流式数据中包含错误: {err_msg}")
  349. continue
  350. choices = event_data.get("choices", [])
  351. if choices:
  352. delta = choices[0].get("delta", {})
  353. content = delta.get("content", "")
  354. if content:
  355. if first_token_time is None:
  356. first_token_time = time.time() - start_time
  357. yield (content, first_token_time)
  358. except json.JSONDecodeError:
  359. continue
  360. except UnicodeDecodeError:
  361. continue
  362. except Exception as e:
  363. logger.error(f"[{trace_id}] API 流式请求异常: {e}")
  364. raise
  365. # ==================== 上下文生成业务逻辑辅助 ====================
  366. CONTEXT_GENERATE_SYSTEM_PROMPT = "你是一位专业的施工方案编写专家。请直接输出生成的内容文本,不要添加任何解释、标注或格式标记。要求生成的内容不超过100字。"
  367. def build_context_generate_prompt(project_info, section_path, section_title, current_content, completion_mode, target_length, include_references, style_match, hint_keywords, context_before="", context_after=""):
  368. parts = []
  369. parts.append(f"【项目】{project_info.get('project_name', '未知')}")
  370. parts.append(f"【章节】{section_title} ({section_path})")
  371. parts.append(f"【模式】{completion_mode} (目标:{target_length})")
  372. if context_before: parts.append(f"【前文】...{context_before[-500:]}")
  373. if current_content: parts.append(f"【当前】{current_content}")
  374. if context_after: parts.append(f"【后文】{context_after[:500]}...")
  375. parts.append("【指令】请根据上述信息继续生成专业内容,直接输出正文:")
  376. return "\n".join(parts)
  377. def extract_chunk_content(chunk: Any) -> str:
  378. if isinstance(chunk, str): return chunk
  379. if hasattr(chunk, 'content'): return str(chunk.content) if chunk.content else ""
  380. if isinstance(chunk, dict): return str(chunk.get('content', ''))
  381. return str(chunk)
  382. def validate_user_id(user_id: str):
  383. supported_users = {'user-001', 'user-002', 'user-003'}
  384. if user_id not in supported_users:
  385. raise HTTPException(status_code=403, detail={"code": "INVALID_USER", "message": "用户标识无效"})
  386. def validate_completion_config(config: CompletionConfig):
  387. if not config.section_path or not all(p.isdigit() for p in config.section_path.split(".")):
  388. raise HTTPException(status_code=400, detail={"code": "INVALID_PATH", "message": "章节路径格式错误"})
  389. def validate_request(request: ContextGenerateRequest):
  390. if not request.task_id and not request.project_info:
  391. raise HTTPException(status_code=400, detail={"code": "MISSING_INFO", "message": "缺少任务 ID 或项目信息"})
  392. # ==================== 上下文生成核心流式逻辑 ====================
  393. async def generate_content_stream(callback_task_id, source_task_id, user_id, request, redis_client):
  394. async def is_cancelled() -> bool:
  395. if not redis_client: return False
  396. try: return await redis_client.exists(f"terminate:{callback_task_id}") > 0
  397. except: return False
  398. stream_start_time = time.time()
  399. first_token_latency: Optional[float] = None
  400. full_content_parts: List[str] = []
  401. chunk_count = 0
  402. try:
  403. yield format_sse_event("connected", json.dumps({
  404. "callback_task_id": callback_task_id, "status": "connected", "timestamp": int(time.time())
  405. }, ensure_ascii=False))
  406. project_info = request.project_info.dict() if request.project_info else {}
  407. section_title = f"章节 {request.completion_config.section_path}"
  408. user_prompt = build_context_generate_prompt(
  409. project_info=project_info,
  410. section_path=request.completion_config.section_path,
  411. section_title=section_title,
  412. current_content=request.completion_config.current_content,
  413. completion_mode=request.completion_config.completion_mode,
  414. target_length=request.completion_config.target_length,
  415. include_references=request.completion_config.include_references,
  416. style_match=request.completion_config.style_match,
  417. hint_keywords=request.completion_config.hint_keywords
  418. )
  419. yield format_sse_event("generating", json.dumps({
  420. "status": "generating",
  421. "message": f"正在调用阿里云 Qwen3 ({CustomAPIConfig.get_model_name()})...",
  422. "timestamp": int(time.time())
  423. }, ensure_ascii=False))
  424. # 执行生成
  425. if CustomAPIConfig.is_enabled():
  426. logger.info(f"[{callback_task_id}] 使用阿里云 DashScope API (模型:{CustomAPIConfig.get_model_name()})")
  427. async for content, ftl in call_custom_api_stream(
  428. prompt=user_prompt,
  429. system_prompt=CONTEXT_GENERATE_SYSTEM_PROMPT,
  430. max_tokens=min(request.completion_config.target_length, 4000),
  431. temperature=0.7,
  432. trace_id=callback_task_id
  433. ):
  434. if await is_cancelled():
  435. yield format_sse_event("cancelled", json.dumps({"status": "cancelled"}, ensure_ascii=False))
  436. return
  437. if content:
  438. full_content_parts.append(content)
  439. chunk_count += 1
  440. if first_token_latency is None:
  441. first_token_latency = ftl if ftl is not None else (time.time() - stream_start_time)
  442. logger.info(f"[{callback_task_id}] ⚡ 首字延迟: {first_token_latency:.3f}s (Model: {CustomAPIConfig.get_model_name()})")
  443. yield format_sse_event("chunk", json.dumps({
  444. "chunk": content,
  445. "first_token_latency": round(first_token_latency, 3),
  446. "timestamp": int(time.time())
  447. }, ensure_ascii=False))
  448. else:
  449. # 备用逻辑 (理论上不会触发,因为 Key 已硬编码)
  450. logger.warning(f"[{callback_task_id}] API 配置失效,回退到默认模型 (不应发生)")
  451. raise Exception("API 配置未生效,请检查 CustomAPIConfig")
  452. # 完成统计
  453. total_duration = time.time() - stream_start_time
  454. full_content = "".join(full_content_parts)
  455. logger.info(f"[{callback_task_id}] ✅ 完成 | 首字: {first_token_latency:.3f}s | 总耗时: {total_duration:.3f}s | 字数: {len(full_content)}")
  456. yield format_sse_event("completed", json.dumps({
  457. "callback_task_id": callback_task_id,
  458. "status": "completed",
  459. "metrics": {
  460. "first_token_latency": round(first_token_latency, 3) if first_token_latency else 0.0,
  461. "total_duration": round(total_duration, 3),
  462. "char_count": len(full_content),
  463. "chunk_count": chunk_count,
  464. "model_used": CustomAPIConfig.get_model_name()
  465. },
  466. "full_content": full_content,
  467. "timestamp": int(time.time())
  468. }, ensure_ascii=False))
  469. except Exception as e:
  470. logger.error(f"[{callback_task_id}] ❌ 异常: {str(e)}", exc_info=True)
  471. yield format_sse_event("error", json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False))
  472. # ==================== 上下文生成API路由 ====================
  473. context_generate_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
  474. @context_generate_router.post("/context_generate")
  475. @auto_trace(generate_if_missing=True)
  476. async def context_generate(request: ContextGenerateRequest):
  477. callback_task_id = f"ctx_{uuid.uuid4().hex[:12]}"
  478. TraceContext.set_trace_id(callback_task_id)
  479. receive_time = time.time()
  480. try:
  481. validate_user_id(request.user_id)
  482. validate_completion_config(request.completion_config)
  483. validate_request(request)
  484. redis_client = await get_redis_client()
  485. logger.info(f"[{callback_task_id}] 请求接收 (预处理耗时: {(time.time()-receive_time)*1000:.1f}ms)")
  486. return StreamingResponse(
  487. generate_content_stream(callback_task_id, request.task_id, request.user_id, request, redis_client),
  488. media_type="text/event-stream",
  489. headers={
  490. "Cache-Control": "no-cache, no-store, must-revalidate",
  491. "Pragma": "no-cache",
  492. "Expires": "0",
  493. "Connection": "keep-alive",
  494. "X-Accel-Buffering": "no",
  495. "Content-Type": "text/event-stream; charset=utf-8",
  496. "Access-Control-Allow-Origin": "*"
  497. }
  498. )
  499. except HTTPException:
  500. raise
  501. except Exception as e:
  502. logger.error(f"[{callback_task_id}] 全局异常: {str(e)}")
  503. raise HTTPException(status_code=500, detail=str(e))
  504. @context_generate_router.get("/context_generate_health")
  505. async def health_check():
  506. return {
  507. "status": "healthy",
  508. "provider": "Aliyun DashScope",
  509. "current_model": CustomAPIConfig.get_model_name(),
  510. "api_url_prefix": "https://dashscope.aliyuncs.com/compatible-mode/v1"
  511. }
  512. @context_generate_router.get("/context_generate_modes", response_model=ContextGenerateResponse)
  513. async def get_modes():
  514. modes = [
  515. {"mode": "continue", "name": "续写"}, {"mode": "expand", "name": "扩写"},
  516. {"mode": "polish", "name": "润色"}, {"mode": "complete", "name": "补全"}
  517. ]
  518. return ContextGenerateResponse(code=200, message="success", data={"modes": modes})
  519. @context_generate_router.get("/context_generate_api_status", response_model=ContextGenerateResponse)
  520. async def get_api_status():
  521. enabled = CustomAPIConfig.is_enabled()
  522. return ContextGenerateResponse(
  523. code=200, message="success",
  524. data={
  525. "enabled": enabled,
  526. "provider": "Aliyun DashScope",
  527. "model": CustomAPIConfig.get_model_name()
  528. }
  529. )
  530. # ==================== 原有大纲生成接口实现 ====================
  531. @outline_router.post("/generating_outline", response_model=None)
  532. @auto_trace(generate_if_missing=True)
  533. async def generating_outline(request: OutlineGenerationRequest):
  534. """
  535. 大纲生成接口 (SSE 流式响应)
  536. """
  537. callback_task_id = f"outline_{uuid.uuid4().hex[:16]}"
  538. TraceContext.set_trace_id(callback_task_id)
  539. user_id = request.user_id
  540. logger.info(f"接收大纲生成 SSE 请求: user_id={user_id}, project={request.project_info.base_info.project_name}")
  541. # ===== 新增:创建 Redis 连接用于检查终止标志 =====
  542. redis_check_client = None
  543. try:
  544. redis_check_client = AsyncRedis(
  545. host='127.0.0.1',
  546. port=6379,
  547. password='123456',
  548. db=0,
  549. decode_responses=True,
  550. socket_connect_timeout=2,
  551. socket_timeout=2
  552. )
  553. except Exception as e:
  554. logger.warning(f"[{callback_task_id}] 创建取消检查Redis连接失败: {e}")
  555. # ===== 新增:定义取消检查函数 =====
  556. async def is_task_cancelled() -> bool:
  557. """检查任务是否被取消"""
  558. if not redis_check_client or not callback_task_id:
  559. return False
  560. try:
  561. return await redis_check_client.exists(f"terminate:{callback_task_id}") > 0
  562. except Exception:
  563. return False
  564. # 使用统一 SSE 管理器建立连接并注册回调
  565. queue = await unified_sse_manager.establish_connection(callback_task_id, sse_progress_callback)
  566. async def generate_outline_events() -> AsyncGenerator[str, None]:
  567. """生成大纲生成 SSE 事件流"""
  568. try:
  569. # ===== 检查点1: 函数开始 =====
  570. if await is_task_cancelled():
  571. logger.info(f"[{callback_task_id}] 连接建立前检测到取消信号")
  572. cancelled_data = json.dumps({
  573. "callback_task_id": callback_task_id,
  574. "user_id": user_id,
  575. "current": 0,
  576. "stage_name": "任务已取消",
  577. "status": "cancelled",
  578. "message": "任务已被用户取消",
  579. "overall_task_status": "cancelled",
  580. "updated_at": int(time.time())
  581. }, ensure_ascii=False)
  582. yield format_sse_event("cancelled", cancelled_data)
  583. return
  584. # 发送连接确认事件
  585. connected_data = json.dumps({
  586. "callback_task_id": callback_task_id,
  587. "user_id": user_id,
  588. "current": 0,
  589. "stage_name": "连接建立",
  590. "status": "connected",
  591. "message": "SSE 连接已建立,正在启动大纲生成任务...",
  592. "overall_task_status": "processing",
  593. "updated_at": int(time.time())
  594. }, ensure_ascii=False)
  595. yield format_sse_event("connected", connected_data)
  596. # 构建任务信息
  597. base_info = request.project_info.base_info
  598. project_info_flat = {
  599. "project_name": base_info.project_name,
  600. "construct_location": base_info.construct_location,
  601. "engineering_type": base_info.engineering_type,
  602. "selectable": request.project_info.selectable or ""
  603. }
  604. sgbx_task_info = {
  605. "callback_task_id": callback_task_id,
  606. "user_id": user_id,
  607. "project_info": project_info_flat,
  608. "template_id": request.generation_template.alias or "default_template",
  609. "generation_chapterenum": request.generation_chapterenum,
  610. "generation_template": [
  611. item.dict() if isinstance(item, TemplateStructureItem) else item
  612. for item in request.generation_template.structure
  613. ],
  614. "similarity_config": {
  615. "topk_plans": 3,
  616. "topk_fragments": 10,
  617. "threshold": 0.75
  618. },
  619. "knowledge_config": {
  620. "topk": 3,
  621. "threshold": 0.75
  622. },
  623. }
  624. # ===== 检查点2: 任务提交前 =====
  625. if await is_task_cancelled():
  626. logger.info(f"[{callback_task_id}] 任务提交前检测到取消信号")
  627. cancelled_data = json.dumps({
  628. "callback_task_id": callback_task_id,
  629. "user_id": user_id,
  630. "current": 0,
  631. "stage_name": "任务已取消",
  632. "status": "cancelled",
  633. "message": "任务已被用户取消",
  634. "overall_task_status": "cancelled",
  635. "updated_at": int(time.time())
  636. }, ensure_ascii=False)
  637. yield format_sse_event("cancelled", cancelled_data)
  638. return
  639. # 发送处理中事件
  640. processing_data = json.dumps({
  641. "callback_task_id": callback_task_id,
  642. "user_id": user_id,
  643. "current": 5,
  644. "stage_name": "任务提交中",
  645. "status": "processing",
  646. "message": "正在提交大纲生成任务...",
  647. "overall_task_status": "processing",
  648. "updated_at": int(time.time())
  649. }, ensure_ascii=False)
  650. yield format_sse_event("processing", processing_data)
  651. # 提交任务到 Celery
  652. celery_task_id = await workflow_manager.submit_outline_generation_task(sgbx_task_info)
  653. logger.info(f"大纲生成任务已提交: callback_task_id={callback_task_id}, celery_task_id={celery_task_id}")
  654. # 发送任务提交成功事件
  655. submitted_data = json.dumps({
  656. "callback_task_id": callback_task_id,
  657. "user_id": user_id,
  658. "current": 10,
  659. "stage_name": "任务已提交",
  660. "status": "submitted",
  661. "message": "大纲生成任务已提交,正在执行...",
  662. "overall_task_status": "processing",
  663. "updated_at": int(time.time()),
  664. "celery_task_id": celery_task_id
  665. }, ensure_ascii=False)
  666. yield format_sse_event("submitted", submitted_data)
  667. # 持续监听进度并转发
  668. last_progress = 10
  669. last_progress_data = None
  670. last_event_type = "processing"
  671. last_message = ""
  672. no_change_count = 0
  673. while True:
  674. try:
  675. # ===== 检查点3: 每次轮询前检查取消 =====
  676. if await is_task_cancelled():
  677. logger.info(f"[{callback_task_id}] 进度轮询中检测到取消信号")
  678. cancelled_data = json.dumps({
  679. "callback_task_id": callback_task_id,
  680. "user_id": user_id,
  681. "current": last_progress,
  682. "stage_name": "任务已取消",
  683. "status": "cancelled",
  684. "message": "任务已被用户取消",
  685. "overall_task_status": "cancelled",
  686. "updated_at": int(time.time())
  687. }, ensure_ascii=False)
  688. yield format_sse_event("cancelled", cancelled_data)
  689. return
  690. # 从 Redis 获取最新进度
  691. progress_data = await progress_manager.get_progress(callback_task_id)
  692. if progress_data:
  693. current_progress = progress_data.get("current", last_progress)
  694. current_event_type = progress_data.get("event_type", "processing")
  695. current_message = progress_data.get("message", "")
  696. # 检查进度数据中是否已经是取消状态
  697. if progress_data.get("overall_task_status") == "cancelled":
  698. logger.info(f"[{callback_task_id}] 从进度数据检测到取消状态")
  699. yield format_sse_event("cancelled", json.dumps(progress_data, ensure_ascii=False))
  700. return
  701. # 进度有变化时推送
  702. should_push = False
  703. if current_progress != last_progress:
  704. should_push = True
  705. elif current_event_type != last_event_type:
  706. should_push = True
  707. elif current_message != last_message:
  708. should_push = True
  709. elif last_progress_data is None:
  710. should_push = True
  711. elif progress_data.get("overall_task_status") != last_progress_data.get("overall_task_status"):
  712. should_push = True
  713. if should_push:
  714. last_progress = current_progress
  715. last_event_type = current_event_type
  716. last_message = current_message
  717. last_progress_data = progress_data
  718. yield format_sse_event("processing", json.dumps(progress_data, ensure_ascii=False))
  719. no_change_count = 0
  720. else:
  721. no_change_count += 1
  722. # 检查任务状态
  723. status = progress_data.get("overall_task_status")
  724. # ===== 新增:检测到取消立即返回 =====
  725. if status == "cancelled":
  726. logger.info(f"[{callback_task_id}] 检测到任务已取消")
  727. yield format_sse_event("cancelled", json.dumps(progress_data, ensure_ascii=False))
  728. return
  729. # 检查任务是否完成
  730. if status in ["completed", "failed", "terminated"]:
  731. break
  732. await asyncio.sleep(0.5)
  733. # 每 6 秒发送一次心跳
  734. if no_change_count >= 30:
  735. heartbeat_data = json.dumps({
  736. "callback_task_id": callback_task_id,
  737. "user_id": user_id,
  738. "current": last_progress,
  739. "stage_name": "执行中",
  740. "status": "processing",
  741. "message": "大纲生成任务正在执行中...",
  742. "overall_task_status": "processing",
  743. "updated_at": int(time.time())
  744. }, ensure_ascii=False)
  745. yield format_sse_event("heartbeat", heartbeat_data)
  746. no_change_count = 0
  747. except Exception as e:
  748. logger.warning(f"轮询进度异常: {callback_task_id}, 错误: {str(e)}")
  749. await asyncio.sleep(0.5)
  750. # 获取最终结果
  751. # 获取最终结果
  752. final_result = await workflow_manager.get_outline_sgbx_task_info(callback_task_id)
  753. # ===== 检查点4: 结果返回前检查取消 =====
  754. if await is_task_cancelled():
  755. logger.info(f"[{callback_task_id}] 结果返回前检测到取消信号")
  756. cancelled_data = json.dumps({
  757. "callback_task_id": callback_task_id,
  758. "user_id": user_id,
  759. "current": last_progress,
  760. "stage_name": "任务已取消",
  761. "status": "cancelled",
  762. "message": "任务已被用户取消",
  763. "overall_task_status": "cancelled",
  764. "updated_at": int(time.time())
  765. }, ensure_ascii=False)
  766. yield format_sse_event("cancelled", cancelled_data)
  767. return
  768. # ===== 新增:检查任务结果是否为已取消 =====
  769. if final_result and final_result.get("status") == "cancelled":
  770. logger.info(f"[{callback_task_id}] 任务结果状态为已取消,不返回实际结果")
  771. cancelled_data = json.dumps({
  772. "callback_task_id": callback_task_id,
  773. "user_id": user_id,
  774. "current": last_progress,
  775. "stage_name": "任务已取消",
  776. "status": "cancelled",
  777. "message": final_result.get("message", "任务已被用户取消"),
  778. "overall_task_status": "cancelled",
  779. "updated_at": int(time.time())
  780. }, ensure_ascii=False)
  781. yield format_sse_event("cancelled", cancelled_data)
  782. return
  783. if final_result and final_result.get("status") == "completed":
  784. completed_data = json.dumps({
  785. "callback_task_id": callback_task_id,
  786. "user_id": user_id,
  787. "current": 100,
  788. "stage_name": "大纲生成完成",
  789. "status": "completed",
  790. "message": "大纲生成任务已完成",
  791. "overall_task_status": "completed",
  792. "updated_at": int(time.time()),
  793. "result": {
  794. "outline_structure": final_result.get("results", {}).get("outline_structure", []),
  795. "similar_plan": final_result.get("results", {}).get("similar_plan", [])
  796. }
  797. }, ensure_ascii=False)
  798. yield format_sse_event("completed", completed_data)
  799. else:
  800. failed_data = json.dumps({
  801. "callback_task_id": callback_task_id,
  802. "user_id": user_id,
  803. "current": last_progress,
  804. "stage_name": "任务失败",
  805. "status": "failed",
  806. "message": final_result.get("results", {}).get("error", "大纲生成任务失败") if final_result else "任务执行失败",
  807. "overall_task_status": "failed",
  808. "updated_at": int(time.time())
  809. }, ensure_ascii=False)
  810. yield format_sse_event("failed", failed_data)
  811. except Exception as e:
  812. logger.error(f"大纲生成 SSE 事件流错误: {str(e)}", exc_info=True)
  813. error_data = json.dumps({
  814. "callback_task_id": callback_task_id,
  815. "user_id": user_id,
  816. "current": 0,
  817. "stage_name": "系统错误",
  818. "status": "error",
  819. "message": f"系统错误: {str(e)}",
  820. "overall_task_status": "failed",
  821. "updated_at": int(time.time())
  822. }, ensure_ascii=False)
  823. yield format_sse_event("error", error_data)
  824. finally:
  825. # 关闭 Redis 连接
  826. if redis_check_client:
  827. try:
  828. await redis_check_client.close()
  829. except Exception:
  830. pass
  831. # 关闭 SSE 连接
  832. await unified_sse_manager.close_connection(callback_task_id)
  833. return StreamingResponse(
  834. generate_outline_events(),
  835. media_type="text/event-stream",
  836. headers={
  837. "Cache-Control": "no-cache",
  838. "Connection": "keep-alive",
  839. "X-Accel-Buffering": "no"
  840. }
  841. )
  842. @outline_router.post("/regenerate_outline", response_model=None)
  843. @auto_trace(generate_if_missing=True)
  844. async def regenerate_outline(request: RegenerateOutlineRequest):
  845. """
  846. 重新生成大纲接口 (SSE 流式响应)
  847. 【任务状态管理】
  848. - 重新生成会创建**新任务**,原任务状态保持不变
  849. - 新任务通过 regenerate_config.source_task_id 关联原任务
  850. - 原任务仍可查询,不受影响
  851. 【字段说明】
  852. - generation_chapterenum: 可选,默认使用原任务的章节列表
  853. - project_info: 可选,默认使用原任务的项目信息
  854. - generation_template: 可选,默认使用原任务的模板
  855. 【错误处理】
  856. - 原任务不存在: 返回 404 错误事件
  857. - 原任务已完成/失败: 允许重新生成(基于已完成结果进行局部调整)
  858. - 重新生成配置缺失: 返回 400 错误事件
  859. 【与 /generating_outline 的复用关系】
  860. - 复用 generating_outline 的核心 SSE 事件流生成逻辑
  861. - 差异点:1) 构建任务信息时合并原任务数据 2) 添加 regenerate_config 标记
  862. """
  863. # ===== 1. 参数校验 =====
  864. if not request.regenerate_config:
  865. logger.error("重新生成配置缺失")
  866. raise HTTPException(status_code=400, detail="regenerate_config 为必填项")
  867. # 生成新任务ID(重要:重新生成创建新任务,不覆盖原任务)
  868. new_callback_task_id = f"outline_regen_{uuid.uuid4().hex[:16]}"
  869. source_task_id = request.task_id # 原任务ID用于数据查询
  870. TraceContext.set_trace_id(new_callback_task_id)
  871. user_id = request.user_id
  872. regenerate_config = request.regenerate_config
  873. logger.info(f"接收重新生成大纲 SSE 请求: "
  874. f"source_task_id={source_task_id}, "
  875. f"new_task_id={new_callback_task_id}, "
  876. f"user_id={user_id}, "
  877. f"target={regenerate_config.get('target_path', 'unknown')}")
  878. # ===== 2. 获取原任务信息(带错误处理)=====
  879. original_task = None
  880. try:
  881. original_task = await workflow_manager.get_outline_sgbx_task_info(source_task_id)
  882. except Exception as e:
  883. logger.warning(f"获取原任务信息异常: {source_task_id}, error={e}")
  884. # 原任务不存在处理
  885. if not original_task:
  886. logger.error(f"原任务不存在: {source_task_id}")
  887. async def error_not_found():
  888. error_data = json.dumps({
  889. "callback_task_id": new_callback_task_id,
  890. "source_task_id": source_task_id,
  891. "user_id": user_id,
  892. "current": 0,
  893. "stage_name": "原任务不存在",
  894. "status": "error",
  895. "message": f"原任务不存在或已过期: {source_task_id}",
  896. "overall_task_status": "failed",
  897. "error_code": "SOURCE_TASK_NOT_FOUND",
  898. "updated_at": int(time.time())
  899. }, ensure_ascii=False)
  900. yield format_sse_event("error", error_data)
  901. return StreamingResponse(
  902. error_not_found(),
  903. media_type="text/event-stream",
  904. headers={
  905. "Cache-Control": "no-cache",
  906. "Connection": "keep-alive",
  907. "X-Accel-Buffering": "no"
  908. }
  909. )
  910. # 获取原任务状态
  911. original_status = original_task.get("status") or original_task.get("overall_task_status", "unknown")
  912. logger.info(f"原任务状态: {source_task_id} = {original_status}")
  913. # 使用统一 SSE 管理器建立连接(使用新任务ID)
  914. queue = await unified_sse_manager.establish_connection(new_callback_task_id, sse_progress_callback)
  915. # ===== 3. 复用 generating_outline 的核心逻辑 =====
  916. async def generate_regenerate_events() -> AsyncGenerator[str, None]:
  917. """生成重新生成 SSE 事件流 - 复用 generating_outline 模式"""
  918. redis_check_client = None
  919. try:
  920. # ===== 3.1 初始化 Redis 连接(复用 generating_outline 模式)=====
  921. try:
  922. redis_check_client = AsyncRedis(
  923. host='127.0.0.1',
  924. port=6379,
  925. password='123456',
  926. db=0,
  927. decode_responses=True,
  928. socket_connect_timeout=2,
  929. socket_timeout=2
  930. )
  931. except Exception as e:
  932. logger.warning(f"[{new_callback_task_id}] 创建取消检查Redis连接失败: {e}")
  933. # 定义取消检查函数(复用 generating_outline 模式)
  934. async def is_task_cancelled() -> bool:
  935. """检查任务是否被取消"""
  936. if not redis_check_client or not new_callback_task_id:
  937. return False
  938. try:
  939. return await redis_check_client.exists(f"terminate:{new_callback_task_id}") > 0
  940. except Exception:
  941. return False
  942. # ===== 3.2 检查取消(复用 generating_outline 检查点1)=====
  943. if await is_task_cancelled():
  944. logger.info(f"[{new_callback_task_id}] 连接建立前检测到取消信号")
  945. cancelled_data = json.dumps({
  946. "callback_task_id": new_callback_task_id,
  947. "source_task_id": source_task_id,
  948. "user_id": user_id,
  949. "current": 0,
  950. "stage_name": "任务已取消",
  951. "status": "cancelled",
  952. "message": "任务已被用户取消",
  953. "overall_task_status": "cancelled",
  954. "updated_at": int(time.time())
  955. }, ensure_ascii=False)
  956. yield format_sse_event("cancelled", cancelled_data)
  957. return
  958. # ===== 3.3 发送连接确认(复用 generating_outline 模式)=====
  959. connected_data = json.dumps({
  960. "callback_task_id": new_callback_task_id,
  961. "source_task_id": source_task_id,
  962. "user_id": user_id,
  963. "current": 0,
  964. "stage_name": "连接建立",
  965. "status": "connected",
  966. "message": f"SSE 连接已建立,正在启动重新生成任务(原任务: {source_task_id}, 状态: {original_status})...",
  967. "overall_task_status": "processing",
  968. "updated_at": int(time.time())
  969. }, ensure_ascii=False)
  970. yield format_sse_event("connected", connected_data)
  971. # ===== 3.4 构建任务信息(合并原任务数据 + 新配置)=====
  972. # 优先使用传入的 project_info,否则使用原任务的
  973. if request.project_info:
  974. base_info = request.project_info.base_info
  975. project_info_flat = {
  976. "project_name": base_info.project_name,
  977. "construct_location": base_info.construct_location,
  978. "engineering_type": base_info.engineering_type,
  979. "selectable": request.project_info.selectable or ""
  980. }
  981. else:
  982. project_info_flat = original_task.get("project_info", {})
  983. # 处理 generation_template
  984. if request.generation_template:
  985. outline_structure = [
  986. item.dict() if isinstance(item, TemplateStructureItem) else item
  987. for item in request.generation_template.structure
  988. ]
  989. template_alias = request.generation_template.alias or "default_template"
  990. else:
  991. # 从原任务提取模板结构
  992. outline_structure = original_task.get("generation_template", [])
  993. if not outline_structure:
  994. outline_structure = original_task.get("results", {}).get("outline_structure", [])
  995. template_alias = original_task.get("template_id", "default_template")
  996. # 处理 generation_chapterenum(可选,默认使用原任务)
  997. generation_chapterenum = request.generation_chapterenum
  998. if generation_chapterenum is None:
  999. generation_chapterenum = original_task.get("generation_chapterenum", [])
  1000. # 如果原任务也没有,则根据 regenerate_config.target_path 推断
  1001. if not generation_chapterenum and regenerate_config.get("target_path"):
  1002. target_path = regenerate_config.get("target_path")
  1003. # 内嵌:根据路径查找章节代码的逻辑
  1004. original_outline = original_task.get("results", {}).get("outline_structure", [])
  1005. chapter_code = None
  1006. if original_outline and target_path:
  1007. path_parts = target_path.split(".")
  1008. def search_in_nodes(nodes, depth=0):
  1009. if depth >= len(path_parts):
  1010. return None
  1011. target_index = path_parts[depth]
  1012. for node in nodes:
  1013. node_index = str(node.get("index", ""))
  1014. if node_index == target_index:
  1015. if depth == len(path_parts) - 1:
  1016. return node.get("code")
  1017. children = node.get("children", [])
  1018. if children:
  1019. result = search_in_nodes(children, depth + 1)
  1020. if result:
  1021. return result
  1022. return None
  1023. chapter_code = search_in_nodes(original_outline)
  1024. if chapter_code:
  1025. generation_chapterenum = [chapter_code]
  1026. # 构建完整任务信息(与 generating_outline 格式保持一致)
  1027. sgbx_task_info = {
  1028. "callback_task_id": new_callback_task_id,
  1029. "source_task_id": source_task_id, # 关联原任务
  1030. "user_id": user_id,
  1031. "project_info": project_info_flat,
  1032. "template_id": template_alias,
  1033. "generation_chapterenum": generation_chapterenum,
  1034. "generation_template": outline_structure,
  1035. "similarity_config": original_task.get("similarity_config", {
  1036. "topk_plans": 3,
  1037. "topk_fragments": 10,
  1038. "threshold": 0.75
  1039. }),
  1040. "knowledge_config": original_task.get("knowledge_config", {
  1041. "topk": 3,
  1042. "threshold": 0.75
  1043. }),
  1044. # 重新生成特有配置
  1045. "regenerate_config": regenerate_config,
  1046. "is_regenerate": True,
  1047. "original_task_status": original_status # 记录原任务状态
  1048. }
  1049. logger.info(f"重新生成任务信息构建完成: "
  1050. f"new_task_id={new_callback_task_id}, "
  1051. f"source_task_id={source_task_id}, "
  1052. f"target={regenerate_config.get('target_path', 'unknown')}, "
  1053. f"chapters={generation_chapterenum}")
  1054. # ===== 3.5 检查取消(复用 generating_outline 检查点2)=====
  1055. if await is_task_cancelled():
  1056. logger.info(f"[{new_callback_task_id}] 任务提交前检测到取消信号")
  1057. cancelled_data = json.dumps({
  1058. "callback_task_id": new_callback_task_id,
  1059. "source_task_id": source_task_id,
  1060. "user_id": user_id,
  1061. "current": 0,
  1062. "stage_name": "任务已取消",
  1063. "status": "cancelled",
  1064. "message": "任务已被用户取消",
  1065. "overall_task_status": "cancelled",
  1066. "updated_at": int(time.time())
  1067. }, ensure_ascii=False)
  1068. yield format_sse_event("cancelled", cancelled_data)
  1069. return
  1070. # ===== 3.6 发送处理中事件(复用 generating_outline 模式)=====
  1071. processing_data = json.dumps({
  1072. "callback_task_id": new_callback_task_id,
  1073. "source_task_id": source_task_id,
  1074. "user_id": user_id,
  1075. "current": 5,
  1076. "stage_name": "任务提交中",
  1077. "status": "processing",
  1078. "message": f"正在提交重新生成任务(目标: {regenerate_config.get('target_path', 'unknown')})...",
  1079. "overall_task_status": "processing",
  1080. "updated_at": int(time.time())
  1081. }, ensure_ascii=False)
  1082. yield format_sse_event("processing", processing_data)
  1083. # ===== 3.7 提交任务到 Celery(复用 generating_outline 模式)=====
  1084. celery_task_id = await workflow_manager.submit_outline_generation_task(sgbx_task_info)
  1085. logger.info(f"重新生成任务已提交: "
  1086. f"new_callback_task_id={new_callback_task_id}, "
  1087. f"celery_task_id={celery_task_id}")
  1088. # 发送任务提交成功事件
  1089. submitted_data = json.dumps({
  1090. "callback_task_id": new_callback_task_id,
  1091. "source_task_id": source_task_id,
  1092. "user_id": user_id,
  1093. "current": 10,
  1094. "stage_name": "任务已提交",
  1095. "status": "submitted",
  1096. "message": "重新生成任务已提交,正在执行...",
  1097. "overall_task_status": "processing",
  1098. "updated_at": int(time.time()),
  1099. "celery_task_id": celery_task_id
  1100. }, ensure_ascii=False)
  1101. yield format_sse_event("submitted", submitted_data)
  1102. # ===== 3.8 持续监听进度(完全复用 generating_outline 模式)=====
  1103. last_progress = 10
  1104. last_progress_data = None
  1105. last_event_type = "processing"
  1106. last_message = ""
  1107. no_change_count = 0
  1108. while True:
  1109. try:
  1110. # 检查取消(复用 generating_outline 检查点3)
  1111. if await is_task_cancelled():
  1112. logger.info(f"[{new_callback_task_id}] 进度轮询中检测到取消信号")
  1113. cancelled_data = json.dumps({
  1114. "callback_task_id": new_callback_task_id,
  1115. "source_task_id": source_task_id,
  1116. "user_id": user_id,
  1117. "current": last_progress,
  1118. "stage_name": "任务已取消",
  1119. "status": "cancelled",
  1120. "message": "任务已被用户取消",
  1121. "overall_task_status": "cancelled",
  1122. "updated_at": int(time.time())
  1123. }, ensure_ascii=False)
  1124. yield format_sse_event("cancelled", cancelled_data)
  1125. return
  1126. # 从 Redis 获取最新进度
  1127. progress_data = await progress_manager.get_progress(new_callback_task_id)
  1128. if progress_data:
  1129. current_progress = progress_data.get("current", last_progress)
  1130. current_event_type = progress_data.get("event_type", "processing")
  1131. current_message = progress_data.get("message", "")
  1132. # 检查进度数据中是否已经是取消状态
  1133. if progress_data.get("overall_task_status") == "cancelled":
  1134. logger.info(f"[{new_callback_task_id}] 从进度数据检测到取消状态")
  1135. yield format_sse_event("cancelled", json.dumps(progress_data, ensure_ascii=False))
  1136. return
  1137. # 进度有变化时推送
  1138. should_push = False
  1139. if current_progress != last_progress:
  1140. should_push = True
  1141. elif current_event_type != last_event_type:
  1142. should_push = True
  1143. elif current_message != last_message:
  1144. should_push = True
  1145. elif last_progress_data is None:
  1146. should_push = True
  1147. elif progress_data.get("overall_task_status") != last_progress_data.get("overall_task_status"):
  1148. should_push = True
  1149. if should_push:
  1150. last_progress = current_progress
  1151. last_event_type = current_event_type
  1152. last_message = current_message
  1153. last_progress_data = progress_data
  1154. yield format_sse_event("processing", json.dumps(progress_data, ensure_ascii=False))
  1155. no_change_count = 0
  1156. else:
  1157. no_change_count += 1
  1158. # 检查任务状态
  1159. status = progress_data.get("overall_task_status")
  1160. # 检测到取消立即返回
  1161. if status == "cancelled":
  1162. logger.info(f"[{new_callback_task_id}] 检测到任务已取消")
  1163. yield format_sse_event("cancelled", json.dumps(progress_data, ensure_ascii=False))
  1164. return
  1165. # 检查任务是否完成
  1166. if status in ["completed", "failed", "terminated"]:
  1167. break
  1168. await asyncio.sleep(0.5)
  1169. # 每 6 秒发送一次心跳
  1170. if no_change_count >= 30:
  1171. heartbeat_data = json.dumps({
  1172. "callback_task_id": new_callback_task_id,
  1173. "source_task_id": source_task_id,
  1174. "user_id": user_id,
  1175. "current": last_progress,
  1176. "stage_name": "执行中",
  1177. "status": "processing",
  1178. "message": "重新生成任务正在执行中...",
  1179. "overall_task_status": "processing",
  1180. "updated_at": int(time.time())
  1181. }, ensure_ascii=False)
  1182. yield format_sse_event("heartbeat", heartbeat_data)
  1183. no_change_count = 0
  1184. except Exception as e:
  1185. logger.warning(f"轮询进度异常: {new_callback_task_id}, 错误: {str(e)}")
  1186. await asyncio.sleep(0.5)
  1187. # ===== 3.9 获取最终结果(复用 generating_outline 模式)=====
  1188. final_result = await workflow_manager.get_outline_sgbx_task_info(new_callback_task_id)
  1189. # 检查取消(复用 generating_outline 检查点4)
  1190. if await is_task_cancelled():
  1191. logger.info(f"[{new_callback_task_id}] 结果返回前检测到取消信号")
  1192. cancelled_data = json.dumps({
  1193. "callback_task_id": new_callback_task_id,
  1194. "source_task_id": source_task_id,
  1195. "user_id": user_id,
  1196. "current": last_progress,
  1197. "stage_name": "任务已取消",
  1198. "status": "cancelled",
  1199. "message": "任务已被用户取消",
  1200. "overall_task_status": "cancelled",
  1201. "updated_at": int(time.time())
  1202. }, ensure_ascii=False)
  1203. yield format_sse_event("cancelled", cancelled_data)
  1204. return
  1205. # 检查任务结果是否为已取消
  1206. if final_result and final_result.get("status") == "cancelled":
  1207. logger.info(f"[{new_callback_task_id}] 任务结果状态为已取消,不返回实际结果")
  1208. cancelled_data = json.dumps({
  1209. "callback_task_id": new_callback_task_id,
  1210. "source_task_id": source_task_id,
  1211. "user_id": user_id,
  1212. "current": last_progress,
  1213. "stage_name": "任务已取消",
  1214. "status": "cancelled",
  1215. "message": final_result.get("message", "任务已被用户取消"),
  1216. "overall_task_status": "cancelled",
  1217. "updated_at": int(time.time())
  1218. }, ensure_ascii=False)
  1219. yield format_sse_event("cancelled", cancelled_data)
  1220. return
  1221. # ===== 3.10 返回最终结果(复用 generating_outline 模式)=====
  1222. if final_result and final_result.get("status") == "completed":
  1223. completed_data = json.dumps({
  1224. "callback_task_id": new_callback_task_id,
  1225. "source_task_id": source_task_id,
  1226. "user_id": user_id,
  1227. "current": 100,
  1228. "stage_name": "重新生成完成",
  1229. "status": "completed",
  1230. "message": "大纲重新生成任务已完成",
  1231. "overall_task_status": "completed",
  1232. "updated_at": int(time.time()),
  1233. "result": {
  1234. "outline_structure": final_result.get("results", {}).get("outline_structure", []),
  1235. "similar_plan": final_result.get("results", {}).get("similar_plan", [])
  1236. }
  1237. }, ensure_ascii=False)
  1238. yield format_sse_event("completed", completed_data)
  1239. else:
  1240. failed_data = json.dumps({
  1241. "callback_task_id": new_callback_task_id,
  1242. "source_task_id": source_task_id,
  1243. "user_id": user_id,
  1244. "current": last_progress,
  1245. "stage_name": "任务失败",
  1246. "status": "failed",
  1247. "message": final_result.get("results", {}).get("error", "重新生成任务失败") if final_result else "任务执行失败",
  1248. "overall_task_status": "failed",
  1249. "updated_at": int(time.time())
  1250. }, ensure_ascii=False)
  1251. yield format_sse_event("failed", failed_data)
  1252. except Exception as e:
  1253. logger.error(f"重新生成大纲 SSE 事件流错误: {str(e)}", exc_info=True)
  1254. error_data = json.dumps({
  1255. "callback_task_id": new_callback_task_id,
  1256. "source_task_id": source_task_id,
  1257. "user_id": user_id,
  1258. "current": 0,
  1259. "stage_name": "系统错误",
  1260. "status": "error",
  1261. "message": f"系统错误: {str(e)}",
  1262. "overall_task_status": "failed",
  1263. "updated_at": int(time.time())
  1264. }, ensure_ascii=False)
  1265. yield format_sse_event("error", error_data)
  1266. finally:
  1267. # 关闭 Redis 连接
  1268. if redis_check_client:
  1269. try:
  1270. await redis_check_client.close()
  1271. except Exception:
  1272. pass
  1273. # 关闭 SSE 连接
  1274. await unified_sse_manager.close_connection(new_callback_task_id)
  1275. return StreamingResponse(
  1276. generate_regenerate_events(),
  1277. media_type="text/event-stream",
  1278. headers={
  1279. "Cache-Control": "no-cache",
  1280. "Connection": "keep-alive",
  1281. "X-Accel-Buffering": "no"
  1282. }
  1283. )
  1284. # ==================== POST 接口 ====================
  1285. @outline_router.post("/task_cancel")
  1286. @auto_trace(generate_if_missing=True)
  1287. async def task_cancel(request: TaskCancelRequest):
  1288. """
  1289. 取消大纲生成任务
  1290. 【修复】现在支持取消预注册状态(pending)的任务,即任务提交后、Worker 执行前的时间段。
  1291. """
  1292. import redis.asyncio as redis_async
  1293. from redis.asyncio.connection import ConnectionPool
  1294. redis_client = None
  1295. try:
  1296. logger.info(f"接收取消任务请求: task_id={request.task_id}")
  1297. # 【修复】优先使用 workflow_manager 获取任务信息(支持预注册状态)
  1298. task_info = await workflow_manager.get_outline_sgbx_task_info(request.task_id)
  1299. if not task_info:
  1300. return {
  1301. "code": 404,
  1302. "message": "任务不存在",
  1303. "data": {"task_id": request.task_id, "error_type": "TASK_NOT_FOUND"}
  1304. }
  1305. # 检查任务状态
  1306. task_status = task_info.get("status") or task_info.get("overall_task_status", "unknown")
  1307. is_pre_registered = task_info.get("is_pre_registered", False)
  1308. if task_status == "cancelled":
  1309. return {
  1310. "code": 200,
  1311. "message": "任务已处于取消状态",
  1312. "data": {"task_id": request.task_id, "status": "cancelled"}
  1313. }
  1314. if task_status in ["completed", "failed"]:
  1315. return {
  1316. "code": 400,
  1317. "message": f"任务已完成,无法取消",
  1318. "data": {"task_id": request.task_id, "current_status": task_status}
  1319. }
  1320. # 【修复】使用 workflow_manager 的 set_outline_terminate_signal 方法
  1321. # 支持 pending(预注册)和 processing(执行中)两种状态
  1322. result = await workflow_manager.set_outline_terminate_signal(
  1323. callback_task_id=request.task_id,
  1324. operator=request.user_id
  1325. )
  1326. if not result.get("success"):
  1327. logger.warning(f"设置终止信号失败: {result.get('message')}")
  1328. return {
  1329. "code": 400,
  1330. "message": result.get("message", "取消任务失败"),
  1331. "data": {"task_id": request.task_id, "current_status": task_status}
  1332. }
  1333. cancelled_at = int(time.time())
  1334. # 【修复】如果是预注册状态(pending),任务已被直接取消
  1335. if is_pre_registered or task_status == "pending":
  1336. logger.info(f"预注册任务已被取消: {request.task_id}")
  1337. # 更新进度信息
  1338. try:
  1339. await progress_manager.update_stage_progress(
  1340. callback_task_id=request.task_id,
  1341. overall_task_status="cancelled",
  1342. status="cancelled",
  1343. message=f"任务已被用户取消: {request.cancel_reason}"
  1344. )
  1345. except Exception as e:
  1346. logger.warning(f"更新进度信息失败: {e}")
  1347. return {
  1348. "code": 200,
  1349. "message": "任务已成功取消",
  1350. "data": {
  1351. "task_id": request.task_id,
  1352. "status": "cancelled",
  1353. "cancelled_at": cancelled_at,
  1354. "cancel_reason": request.cancel_reason,
  1355. "cancelled_by": request.user_id,
  1356. "is_pre_registered": True
  1357. }
  1358. }
  1359. # 对于正在执行的任务(processing),设置额外的取消标志
  1360. try:
  1361. pool = ConnectionPool(
  1362. host='127.0.0.1',
  1363. port=6379,
  1364. password='123456',
  1365. db=0,
  1366. decode_responses=True,
  1367. max_connections=20,
  1368. socket_connect_timeout=10,
  1369. socket_timeout=10,
  1370. retry_on_timeout=True,
  1371. health_check_interval=30
  1372. )
  1373. redis_client = redis_async.Redis(connection_pool=pool)
  1374. terminate_data = json.dumps({
  1375. "cancelled": True,
  1376. "cancelled_by": request.user_id,
  1377. "cancel_reason": request.cancel_reason,
  1378. "cancelled_at": cancelled_at
  1379. })
  1380. pipe = redis_client.pipeline()
  1381. pipe.set(f"terminate:{request.task_id}", terminate_data, ex=3600)
  1382. pipe.hset(f"progress:{request.task_id}", mapping={
  1383. "overall_task_status": "cancelled",
  1384. "status": "cancelled",
  1385. "message": f"任务已被用户取消: {request.cancel_reason}",
  1386. "cancelled_at": str(cancelled_at),
  1387. "updated_at": str(cancelled_at)
  1388. })
  1389. await pipe.execute()
  1390. logger.info(f"终止标志已设置: {request.task_id}")
  1391. await redis_client.close()
  1392. await pool.disconnect()
  1393. redis_client = None
  1394. except Exception as e:
  1395. logger.error(f"设置终止标志失败: {e}")
  1396. # 不影响主流程,继续执行
  1397. # 尝试终止 Celery 任务
  1398. celery_task_id = task_info.get("celery_task_id") or task_info.get("celery_id")
  1399. if celery_task_id:
  1400. try:
  1401. from celery import current_app as celery_app
  1402. celery_app.control.revoke(celery_task_id, terminate=True)
  1403. logger.info(f"Celery 终止信号已发送: {celery_task_id}")
  1404. except Exception as e:
  1405. logger.warning(f"终止 Celery 任务失败: {e}")
  1406. # 关闭 SSE 连接
  1407. try:
  1408. cancel_event = {
  1409. "callback_task_id": request.task_id,
  1410. "status": "cancelled",
  1411. "overall_task_status": "cancelled",
  1412. "message": f"任务已被用户取消: {request.cancel_reason}",
  1413. "cancelled_at": cancelled_at,
  1414. "cancelled_by": request.user_id
  1415. }
  1416. await unified_sse_manager.send_progress(request.task_id, cancel_event)
  1417. await unified_sse_manager.close_connection(request.task_id)
  1418. except Exception as e:
  1419. logger.warning(f"关闭 SSE 失败: {e}")
  1420. return {
  1421. "code": 200,
  1422. "message": "任务取消成功",
  1423. "data": {
  1424. "task_id": request.task_id,
  1425. "status": "cancelled",
  1426. "cancelled_at": cancelled_at,
  1427. "cancel_reason": request.cancel_reason,
  1428. "cancelled_by": request.user_id
  1429. }
  1430. }
  1431. except Exception as e:
  1432. logger.error(f"取消任务异常: {str(e)}", exc_info=True)
  1433. return {
  1434. "code": 500,
  1435. "message": f"取消任务失败: {str(e)}",
  1436. "data": {"task_id": request.task_id}
  1437. }
  1438. finally:
  1439. # 关闭连接池
  1440. if redis_client:
  1441. try:
  1442. await redis_client.close()
  1443. except:
  1444. pass
  1445. # ==================== 查询接口 ====================
  1446. @outline_router.get("/task_status")
  1447. @auto_trace(generate_if_missing=True)
  1448. async def task_status(
  1449. task_id: str = Query(..., description="任务ID"),
  1450. user_id: str = Query(..., description="用户ID")
  1451. ):
  1452. """
  1453. 查询大纲生成任务状态
  1454. Args:
  1455. task_id: 任务回调ID
  1456. user_id: 用户ID
  1457. Returns:
  1458. dict: 任务状态信息
  1459. """
  1460. try:
  1461. logger.info(f"查询任务状态: task_id={task_id}")
  1462. # 获取任务信息
  1463. sgbx_task_info = await workflow_manager.get_outline_sgbx_task_info(task_id)
  1464. if sgbx_task_info is None:
  1465. return {
  1466. "code": 404,
  1467. "message": "任务不存在或已完成",
  1468. "data": None
  1469. }
  1470. return {
  1471. "code": 200,
  1472. "message": "查询成功",
  1473. "data": sgbx_task_info
  1474. }
  1475. except Exception as e:
  1476. logger.error(f"查询任务状态失败: {str(e)}", exc_info=True)
  1477. raise HTTPException(status_code=500, detail=f"查询任务状态失败: {str(e)}")
  1478. @outline_router.get("/active_tasks")
  1479. @auto_trace(generate_if_missing=True)
  1480. async def active_tasks(
  1481. user_id: str = Query(None, description="用户ID(可选,不提供则返回所有任务)")
  1482. ):
  1483. """
  1484. 获取活跃的大纲生成任务列表
  1485. Args:
  1486. user_id: 用户ID(可选)
  1487. Returns:
  1488. dict: 活跃任务列表
  1489. """
  1490. try:
  1491. logger.info(f"获取活跃任务列表: user_id={user_id}")
  1492. # 获取所有活跃任务
  1493. active_tasks_list = await workflow_manager.get_outline_active_tasks()
  1494. # 如果指定了用户ID,则过滤
  1495. if user_id:
  1496. active_tasks_list = [task for task in active_tasks_list if task["user_id"] == user_id]
  1497. return {
  1498. "code": 200,
  1499. "message": "查询成功",
  1500. "data": {
  1501. "total": len(active_tasks_list),
  1502. "tasks": active_tasks_list
  1503. }
  1504. }
  1505. except Exception as e:
  1506. logger.error(f"获取活跃任务列表失败: {str(e)}", exc_info=True)
  1507. raise HTTPException(status_code=500, detail=f"获取活跃任务列表失败: {str(e)}")
  1508. # 将上下文生成路由注册到应用
  1509. outline_router.include_router(context_generate_router)