| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851 |
- # -*- coding: utf-8 -*-
- """
- 大纲生成 API 接口 (SSE 版本)
- 集成到 Celery + WorkflowManager 架构中
- 提供以下接口:
- - SSE /sgbx/generating_outline: SSE 流式大纲生成
- - SSE /sgbx/regenerate_outline: SSE 流式重新生成
- - POST /sgbx/task_cancel: 取消大纲生成任务
- """
- import uuid
- import json
- import time
- import asyncio
- from typing import Optional, Dict, Any, List, AsyncGenerator, Union
- from pydantic import BaseModel, Field
- from fastapi import APIRouter, HTTPException, Query
- from fastapi.responses import StreamingResponse
- from foundation.observability.logger.loggering import write_logger as logger
- from foundation.infrastructure.tracing import TraceContext, auto_trace
- from core.base.workflow_manager import WorkflowManager
- from core.base.sse_manager import unified_sse_manager
- from core.base.progress_manager import ProgressManager
- # 创建路由
- outline_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
- # 初始化工作流管理器
- workflow_manager = WorkflowManager(
- max_concurrent_docs=3,
- max_concurrent_reviews=5
- )
- # 初始化进度管理器
- progress_manager = ProgressManager()
- async def sse_progress_callback(callback_task_id: str, current_data: dict):
- """SSE 推送回调函数 - 接收进度更新并推送到客户端"""
- await unified_sse_manager.send_progress(callback_task_id, current_data)
- def format_sse_event(event_type: str, data: str) -> str:
- """格式化 SSE 事件 - 按照 SSE 协议格式化事件数据"""
- lines = [
- f"event: {event_type}",
- f"data: {data}",
- "",
- ""
- ]
- return "\n".join(lines) + "\n"
- # ==================== 请求/响应模型 ====================
- class BaseInfo(BaseModel):
- """项目基础信息"""
- project_name: str = Field(..., description="方案名称", example="罗成依达大桥上部结构专项施工方案")
- construct_location: str = Field(..., description="建设地点", example="四川省凉山州")
- engineering_type: str = Field(..., description="方案模版类型", example="T型梁")
- class ProjectInfo(BaseModel):
- """项目信息(嵌套结构)"""
- base_info: BaseInfo = Field(..., description="基础信息")
- selectable: Optional[str] = Field("", description="其他可选信息")
- class TemplateStructureItem(BaseModel):
- """模板结构项(支持嵌套children)"""
- index: str = Field(..., description="章节编号", example="2")
- level: int = Field(..., description="层级", ge=1, le=5)
- title: str = Field(..., description="章节标题", example="工程概况")
- code: str = Field(..., description="章节代码", example="overview")
- # 使用 Union 支持递归类型
- children: Optional[List[Dict[str, Any]]] = Field(None, description="子章节(递归结构)")
- class GenerationTemplate(BaseModel):
- """大纲生成模板
- 示例:
- {
- "source_file": "方案编写助手原文关键词规范文档修改版-2026-2-5.md",
- "alias": "施工方案知识审查与编写体系",
- "structure": [
- {
- "index": "2",
- "level": 1,
- "title": "工程概况",
- "code": "overview",
- "children": [...]
- }
- ]
- }
- """
- source_file: Optional[str] = Field(None, description="源文件", example="方案编写助手原文关键词规范文档修改版-2026-2-5.md")
- alias: Optional[str] = Field(None, description="别名", example="施工方案知识审查与编写体系")
- structure: List[Union[TemplateStructureItem, Dict[str, Any]]] = Field(..., description="模板结构")
- class OutlineGenerationRequest(BaseModel):
- """大纲生成请求
- 示例请求体(适配curl示例):
- {
- "user_id": "user-001",
- "project_info": {
- "base_info": {
- "project_name": "罗成依达大桥上部结构专项施工方案",
- "construct_location": "四川省凉山州",
- "engineering_type": "T型梁"
- },
- "selectable": ""
- },
- "generation_template": {
- "source_file": "方案编写助手原文关键词规范文档修改版-2026-2-5.md",
- "alias": "施工方案知识审查与编写体系",
- "structure": [...]
- },
- "generation_chapterenum": ["overview_DesignSummary_ProjectIntroduction", ...]
- }
- """
- user_id: str = Field(..., description="用户标识", example="user-001")
- project_info: ProjectInfo = Field(..., description="项目基础信息")
- generation_template: GenerationTemplate = Field(..., description="大纲生成模板")
- generation_chapterenum: List[str] = Field(default_factory=list, description="生成章节代码列表,为空时生成全部章节")
- class RegenerateOutlineRequest(BaseModel):
- """重新生成大纲请求
- 复用大纲生成接口的请求定义,额外添加 regenerate_config 字段用于指定重新生成配置。
- project_info 和 generation_template 为可选字段,不传入则使用原任务的信息。
- 示例请求:
- {
- "task_id": "task-20250130-123456",
- "user_id": "user-001",
- "project_info": { // 可选,不传则使用原任务的项目信息
- "base_info": {
- "project_name": "罗成依达大桥上部结构专项施工方案",
- "construct_location": "四川省凉山州",
- "engineering_type": "T型梁"
- },
- "selectable": ""
- },
- "generation_template": { // 可选,不传则使用原任务的模板
- "source_file": "...",
- "alias": "...",
- "structure": [...]
- },
- "generation_chapterenum": ["overview_DesignSummary_MainTechnicalStandards"], // 可选
- "regenerate_config": {
- "regenerate_mode": "chapter",
- "target_path": "2.1",
- "preserve_children": true,
- "reason": "调整内容结构"
- }
- }
- """
- task_id: str = Field(..., description="原大纲生成任务ID")
- user_id: str = Field(..., description="用户ID")
- # 可选:复用大纲生成接口的项目信息(不传则使用原任务的)
- project_info: Optional[ProjectInfo] = Field(None, description="项目基础信息(可选)")
- # 可选:复用大纲生成接口的模板(不传则使用原任务的)
- generation_template: Optional[GenerationTemplate] = Field(None, description="大纲生成模板(可选)")
- # 可选:复用大纲生成接口的章节代码列表
- generation_chapterenum: Optional[List[str]] = Field(None, description="生成章节代码列表(可选)")
- # 重新生成特有的配置
- regenerate_config: Dict[str, Any] = Field(..., description="重新生成配置")
- class TaskCancelRequest(BaseModel):
- """任务取消请求"""
- task_id: str = Field(..., description="任务ID")
- user_id: str = Field(..., description="用户ID")
- cancel_reason: Optional[str] = Field("用户主动取消", description="取消原因")
- # ==================== 响应模型 ====================
- class OutlineNodeResponse(BaseModel):
- """大纲节点响应模型
- 与请求的 TemplateStructureItem 对应,增加以下字段:
- - 每级节点都包含 generated_content
- - 2级和3级节点包含 similar_fragments
- - 末级节点额外包含 key_points 和 knowledge_bases
- """
- index: str = Field(..., description="章节编号", example="2.1.1")
- level: int = Field(..., description="层级", ge=1, le=5, example=3)
- title: str = Field(..., description="章节标题", example="工程简介")
- code: str = Field(..., description="章节代码", example="overview_DesignSummary_ProjectIntroduction")
- generated_content: str = Field(..., description="AI生成的内容", example="罗成依达大桥位于四川省凉山州...")
- # 2级和3级节点包含 similar_fragments
- similar_fragments: Optional[List[Dict[str, Any]]] = Field(None, description="相似片段推荐(2级和3级节点)")
- key_points: Optional[List[str]] = Field(None, description="核心要点(仅末级节点)", example=["桥位位置", "桥梁规模"])
- knowledge_bases: Optional[List[str]] = Field(None, description="知识点/编制依据(仅末级节点)", example=["《公路桥涵设计通用规范》JTG D60-2015"])
- children: Optional[List[Dict[str, Any]]] = Field(None, description="子章节(递归结构)")
- class SimilarPlanResponse(BaseModel):
- """相似方案响应(整篇方案级推荐)"""
- plan_id: str = Field(..., description="方案ID")
- plan_title: str = Field(..., description="方案标题")
- similarity_score: float = Field(..., description="相似度分数", ge=0.0, le=1.0)
- plan_type: str = Field(..., description="方案类型")
- outline: Optional[List[Dict]] = Field(None, description="方案大纲结构")
- metadata: Optional[Dict[str, Any]] = Field(None, description="元数据")
- class SimilarFragmentResponse(BaseModel):
- """相似片段响应"""
- fragment_id: str = Field(..., description="片段ID")
- section_path: str = Field(..., description="所属章节路径")
- section_title: str = Field(..., description="章节标题")
- fragment_content: str = Field(..., description="片段内容")
- similarity_score: float = Field(..., description="相似度分数", ge=0.0, le=1.0)
- source_document_id: str = Field(..., description="来源文档ID")
- source_document_title: str = Field(..., description="来源文档标题")
- class OutlineGenerationResult(BaseModel):
- """大纲生成结果
- 结构说明:
- - outline_structure: 嵌套大纲结构,每个节点包含 generated_content
- - 2级和3级节点额外包含 similar_fragments
- - 末级节点额外包含 key_points 和 knowledge_bases
- - similar_plan: 整篇方案的相似方案推荐(顶层)
- """
- outline_structure: List[OutlineNodeResponse] = Field(..., description="大纲结构(包含AI生成内容和章节级similar_fragments)")
- similar_plan: List[SimilarPlanResponse] = Field(default_factory=list, description="相似方案推荐(整篇方案级)")
- # ==================== SSE 接口实现 ====================
- @outline_router.post("/generating_outline", response_model=None)
- @auto_trace(generate_if_missing=True)
- async def generating_outline(request: OutlineGenerationRequest):
- """
- 大纲生成接口 (SSE 流式响应)
- 流程:
- 1. 接收请求参数
- 2. 生成 callback_task_id
- 3. 提交任务到 Celery
- 4. 通过 SSE 流式返回进度和结果
- Args:
- request: 大纲生成请求参数
- Returns:
- StreamingResponse: SSE 事件流
- Response 格式说明:
- 完成时返回的 result 包含以下字段:
- - outline_structure: 与请求的 generation_template.structure 结构一致
- - 每级节点包含 generated_content
- - 2级和3级节点额外包含 similar_fragments(章节级相似片段)
- - 末级节点额外包含 key_points 和 knowledge_bases
- - similar_plan: 整篇方案的相似方案推荐(顶层)
- 示例响应结构:
- {
- "outline_structure": [
- {
- "index": "2",
- "level": 1,
- "title": "工程概况",
- "code": "overview",
- "generated_content": "本章主要介绍...",
- "children": [
- {
- "index": "2.1",
- "level": 2,
- "title": "设计概况",
- "code": "overview_DesignSummary",
- "generated_content": "本节介绍...",
- "similar_fragments": [...], // 2级节点有相似片段
- "children": [
- {
- "index": "2.1.1",
- "level": 3,
- "title": "工程简介",
- "code": "overview_DesignSummary_ProjectIntroduction",
- "generated_content": "罗成依达大桥位于...",
- "similar_fragments": [...], // 3级节点有相似片段
- "key_points": ["桥位位置", "桥梁规模"],
- "knowledge_bases": ["《公路桥涵设计通用规范》"]
- }
- ]
- }
- ]
- }
- ],
- "similar_plan": [...] // 整篇方案级相似推荐
- }
- """
- callback_task_id = f"outline_{uuid.uuid4().hex[:16]}"
- TraceContext.set_trace_id(callback_task_id)
- user_id = request.user_id
- logger.info(f"接收大纲生成 SSE 请求: user_id={user_id}, project={request.project_info.base_info.project_name}")
- # 使用统一 SSE 管理器建立连接并注册回调
- queue = await unified_sse_manager.establish_connection(callback_task_id, sse_progress_callback)
- async def generate_outline_events() -> AsyncGenerator[str, None]:
- """生成大纲生成 SSE 事件流"""
- try:
- # 发送连接确认事件
- connected_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "连接建立",
- "status": "connected",
- "message": "SSE 连接已建立,正在启动大纲生成任务...",
- "overall_task_status": "processing",
- "updated_at": int(time.time())
- }, ensure_ascii=False)
- yield format_sse_event("connected", connected_data)
- # 构建任务信息(适配新的请求格式)
- # 提取项目基础信息
- base_info = request.project_info.base_info
- project_info_flat = {
- "project_name": base_info.project_name,
- "construct_location": base_info.construct_location,
- "engineering_type": base_info.engineering_type,
- "selectable": request.project_info.selectable or ""
- }
- # 构建任务信息
- sgbx_task_info = {
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "project_info": project_info_flat,
- #"generation_template": request.generation_template.dict(),
- "template_id": request.generation_template.alias or "default_template",
- "generation_chapterenum": request.generation_chapterenum,
- "generation_template": [
- item.dict() if isinstance(item, TemplateStructureItem) else item
- for item in request.generation_template.structure
- ],
- "similarity_config": {
- "topk_plans": 3,
- "topk_fragments": 10,
- "threshold": 0.75
- },
- "knowledge_config": {
- "topk": 3,
- "threshold": 0.75
- },
- }
- # 发送处理中事件
- processing_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 5,
- "stage_name": "任务提交中",
- "status": "processing",
- "message": "正在提交大纲生成任务...",
- "overall_task_status": "processing",
- "updated_at": int(time.time())
- }, ensure_ascii=False)
- yield format_sse_event("processing", processing_data)
- # 提交任务到 Celery
- celery_task_id = await workflow_manager.submit_outline_generation_task(sgbx_task_info)
- logger.info(f"大纲生成任务已提交: callback_task_id={callback_task_id}, celery_task_id={celery_task_id}")
- # 发送任务提交成功事件
- submitted_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 10,
- "stage_name": "任务已提交",
- "status": "submitted",
- "message": "大纲生成任务已提交,正在执行...",
- "overall_task_status": "processing",
- "updated_at": int(time.time()),
- "celery_task_id": celery_task_id
- }, ensure_ascii=False)
- yield format_sse_event("submitted", submitted_data)
- # 持续监听进度并转发(从 Redis 轮询,支持跨进程)
- last_progress = 10
- last_progress_data = None
- no_change_count = 0
- while True:
- try:
- # 从 Redis 获取最新进度(支持跨进程)
- progress_data = await progress_manager.get_progress(callback_task_id)
- if progress_data:
- current_progress = progress_data.get("current", last_progress)
- # 进度有变化或状态变化时推送
- if (current_progress != last_progress or
- progress_data.get("overall_task_status") != last_progress_data.get("overall_task_status") if last_progress_data else True):
- last_progress = current_progress
- last_progress_data = progress_data
- # 转发进度事件
- yield format_sse_event("processing", json.dumps(progress_data, ensure_ascii=False))
- no_change_count = 0
- else:
- no_change_count += 1
- # 检查任务是否完成
- if progress_data.get("overall_task_status") in ["completed", "failed", "terminated"]:
- break
- await asyncio.sleep(0.5)
- # 每 6 秒发送一次心跳(无进度变化时,6秒 = 30 * 200ms)
- if no_change_count >= 30:
- heartbeat_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": last_progress,
- "stage_name": "执行中",
- "status": "processing",
- "message": "大纲生成任务正在执行中...",
- "overall_task_status": "processing",
- "updated_at": int(time.time())
- }, ensure_ascii=False)
- yield format_sse_event("heartbeat", heartbeat_data)
- no_change_count = 0
- except Exception as e:
- logger.warning(f"轮询进度异常: {callback_task_id}, 错误: {str(e)}")
- await asyncio.sleep(0.5)
- # 获取最终结果
- final_result = await workflow_manager.get_outline_sgbx_task_info(callback_task_id)
- if final_result and final_result.get("status") == "completed":
- # 发送完成事件
- # 新的响应格式:
- # - outline_structure: 嵌套结构,每个节点包含 generated_content
- # - 2级和3级节点额外包含 similar_fragments
- # - 末级节点额外包含 key_points 和 knowledge_bases
- # - similar_plan: 整篇方案的相似方案推荐(顶层)
- completed_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 100,
- "stage_name": "大纲生成完成",
- "status": "completed",
- "message": "大纲生成任务已完成",
- "overall_task_status": "completed",
- "updated_at": int(time.time()),
- "result": {
- "outline_structure": final_result.get("results", {}).get("outline_structure", []),
- "similar_plan": final_result.get("results", {}).get("similar_plan", [])
- }
- }, ensure_ascii=False)
- yield format_sse_event("completed", completed_data)
- else:
- # 发送失败事件
- failed_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": last_progress,
- "stage_name": "任务失败",
- "status": "failed",
- "message": final_result.get("results", {}).get("error", "大纲生成任务失败") if final_result else "任务执行失败",
- "overall_task_status": "failed",
- "updated_at": int(time.time())
- }, ensure_ascii=False)
- yield format_sse_event("failed", failed_data)
- except Exception as e:
- logger.error(f"大纲生成 SSE 事件流错误: {str(e)}", exc_info=True)
- # 发送错误事件
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "系统错误",
- "status": "error",
- "message": f"系统错误: {str(e)}",
- "overall_task_status": "failed",
- "updated_at": int(time.time())
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- finally:
- # 关闭 SSE 连接
- await unified_sse_manager.close_connection(callback_task_id)
- return StreamingResponse(
- generate_outline_events(),
- media_type="text/event-stream",
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "X-Accel-Buffering": "no"
- }
- )
- @outline_router.post("/regenerate_outline", response_model=None)
- @auto_trace(generate_if_missing=True)
- async def regenerate_outline(request: RegenerateOutlineRequest):
- """
- 重新生成大纲接口 (SSE 流式响应)
- 按章或按节重新生成大纲内容。
- 复用大纲生成接口的请求定义,project_info 和 generation_template 为可选字段,
- 不传入则使用原任务的信息。
- Args:
- request: 重新生成请求参数,包含:
- - task_id: 原任务ID
- - user_id: 用户ID
- - project_info: 可选,项目基础信息(不传则使用原任务的)
- - generation_template: 可选,大纲生成模板(不传则使用原任务的)
- - regenerate_config: 重新生成配置
- Returns:
- StreamingResponse: SSE 事件流
- 示例请求:
- {
- "task_id": "task-20250130-123456",
- "user_id": "user-001",
- "generation_chapterenum": ["overview_DesignSummary_MainTechnicalStandards"],
- "regenerate_config": {
- "regenerate_mode": "chapter",
- "target_path": "2.1",
- "preserve_children": true,
- "reason": "调整内容结构"
- }
- }
- """
- callback_task_id = request.task_id
- TraceContext.set_trace_id(callback_task_id)
- user_id = request.user_id
- regenerate_config = request.regenerate_config
- logger.info(f"接收重新生成大纲 SSE 请求: task_id={callback_task_id}, user_id={user_id}, "
- f"target={regenerate_config.get('target_path', 'unknown')}")
- # 使用统一 SSE 管理器建立连接
- queue = await unified_sse_manager.establish_connection(callback_task_id, sse_progress_callback)
- async def generate_regenerate_events() -> AsyncGenerator[str, None]:
- """生成重新生成 SSE 事件流"""
- try:
- # 发送连接确认事件
- connected_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "连接建立",
- "status": "connected",
- "message": "SSE 连接已建立,正在启动重新生成任务...",
- "overall_task_status": "processing",
- "updated_at": int(time.time())
- }, ensure_ascii=False)
- yield format_sse_event("connected", connected_data)
- # 获取原任务信息
- original_task = await workflow_manager.get_outline_sgbx_task_info(callback_task_id)
- if not original_task:
- # 发送错误事件
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "任务不存在",
- "status": "error",
- "message": "原任务不存在或已过期",
- "overall_task_status": "failed",
- "updated_at": int(time.time())
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- return
- # 发送处理中事件
- processing_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 5,
- "stage_name": "准备重新生成",
- "status": "processing",
- "message": f"正在准备重新生成: {regenerate_config.get('target_path', 'unknown')}...",
- "overall_task_status": "processing",
- "updated_at": int(time.time())
- }, ensure_ascii=False)
- yield format_sse_event("processing", processing_data)
- # 构建任务信息
- # 优先使用传入的 project_info 和 generation_template,否则使用原任务的
- if request.project_info:
- base_info = request.project_info.base_info
- project_info_flat = {
- "project_name": base_info.project_name,
- "construct_location": base_info.construct_location,
- "engineering_type": base_info.engineering_type,
- "selectable": request.project_info.selectable or ""
- }
- else:
- # 使用原任务的项目信息
- project_info_flat = original_task.get("project_info", {})
- # 处理 generation_template 和 generation_chapterenum
- generation_template = request.generation_template
- if generation_template:
- template_data = generation_template.dict()
- outline_structure = template_data.get("structure", [])
- else:
- # 使用原任务的模板信息
- template_data = original_task.get("generation_template", {})
- outline_structure = original_task.get("outline_structure", [])
- # 优先使用传入的 generation_chapterenum,否则使用原任务的
- generation_chapterenum = request.generation_chapterenum if request.generation_chapterenum is not None else original_task.get("generation_chapterenum", [])
- # 构建任务信息(与大纲生成接口保持一致)
- sgbx_task_info = {
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "project_info": project_info_flat,
- "template_id": template_data.get("alias", "default_template") if template_data else "default_template",
- "generation_chapterenum": generation_chapterenum,
- "generation_template": outline_structure if outline_structure else template_data.get("structure", []),
- "similarity_config": {
- "topk_plans": 3,
- "topk_fragments": 10,
- "threshold": 0.75
- },
- "knowledge_config": {
- "topk": 3,
- "threshold": 0.75
- },
- # 重新生成特有字段
- "regenerate_config": regenerate_config,
- "is_regenerate": True
- }
- logger.info(f"重新生成任务信息构建完成: task_id={callback_task_id}, "
- f"target={regenerate_config.get('target_path', 'unknown')}, "
- f"chapters={generation_chapterenum}")
- # 模拟重新生成过程
- for progress in [20, 40, 60, 80]:
- await asyncio.sleep(0.5) # 模拟处理时间
- progress_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": progress,
- "stage_name": "重新生成中",
- "status": "processing",
- "message": f"正在重新生成章节: {regenerate_config.get('target_path', 'unknown')}...",
- "overall_task_status": "processing",
- "updated_at": int(time.time())
- }, ensure_ascii=False)
- yield format_sse_event("processing", progress_data)
- # 发送完成事件
- # 响应格式与大纲生成接口保持一致
- completed_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 100,
- "stage_name": "重新生成完成",
- "status": "completed",
- "message": "大纲重新生成已完成",
- "overall_task_status": "completed",
- "updated_at": int(time.time()),
- "result": {
- # 复用大纲生成接口的响应格式
- # outline_structure: 仅包含重新生成的章节及其子章节
- # 结构与大纲生成接口一致,包含 generated_content/similar_fragments/key_points/knowledge_bases
- "outline_structure": [
- {
- "index": regenerate_config.get("target_path", "unknown"),
- "level": len(regenerate_config.get("target_path", "").split(".")),
- "title": f"重新生成的章节 {regenerate_config.get('target_path', 'unknown')}",
- "code": generation_chapterenum[0] if generation_chapterenum else "",
- "generated_content": "重新生成的内容...",
- "similar_fragments": [], # 2级/3级节点包含
- "key_points": ["要点1", "要点2"], # 末级节点包含
- "knowledge_bases": ["规范1", "规范2"] # 末级节点包含
- }
- ],
- "similar_plan": [] # 整篇方案级相似推荐
- }
- }, ensure_ascii=False)
- yield format_sse_event("completed", completed_data)
- except Exception as e:
- logger.error(f"重新生成大纲 SSE 事件流错误: {str(e)}", exc_info=True)
- # 发送错误事件
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "系统错误",
- "status": "error",
- "message": f"系统错误: {str(e)}",
- "overall_task_status": "failed",
- "updated_at": int(time.time())
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- finally:
- # 关闭 SSE 连接
- await unified_sse_manager.close_connection(callback_task_id)
- return StreamingResponse(
- generate_regenerate_events(),
- media_type="text/event-stream",
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "X-Accel-Buffering": "no"
- }
- )
- # ==================== POST 接口 ====================
- @outline_router.post("/task_cancel")
- @auto_trace(generate_if_missing=True)
- async def task_cancel(request: TaskCancelRequest):
- """
- 取消大纲生成任务
- 设置任务终止信号,任务将在当前节点完成后终止
- Args:
- request: 任务取消请求参数
- Returns:
- dict: 操作结果
- """
- try:
- logger.info(f"接收取消任务请求: task_id={request.task_id}, user_id={request.user_id}")
- # 设置终止信号
- result = await workflow_manager.set_outline_terminate_signal(
- callback_task_id=request.task_id,
- operator=request.user_id
- )
- return {
- "code": 200 if result["success"] else 400,
- "message": result["message"],
- "data": result.get("sgbx_task_info")
- }
- except Exception as e:
- logger.error(f"取消任务失败: {str(e)}", exc_info=True)
- raise HTTPException(status_code=500, detail=f"取消任务失败: {str(e)}")
- # ==================== 查询接口 ====================
- @outline_router.get("/task_status")
- @auto_trace(generate_if_missing=True)
- async def task_status(
- task_id: str = Query(..., description="任务ID"),
- user_id: str = Query(..., description="用户ID")
- ):
- """
- 查询大纲生成任务状态
- Args:
- task_id: 任务回调ID
- user_id: 用户ID
- Returns:
- dict: 任务状态信息
- """
- try:
- logger.info(f"查询任务状态: task_id={task_id}")
- # 获取任务信息
- sgbx_task_info = await workflow_manager.get_outline_sgbx_task_info(task_id)
- if sgbx_task_info is None:
- return {
- "code": 404,
- "message": "任务不存在或已完成",
- "data": None
- }
- return {
- "code": 200,
- "message": "查询成功",
- "data": sgbx_task_info
- }
- except Exception as e:
- logger.error(f"查询任务状态失败: {str(e)}", exc_info=True)
- raise HTTPException(status_code=500, detail=f"查询任务状态失败: {str(e)}")
- @outline_router.get("/active_tasks")
- @auto_trace(generate_if_missing=True)
- async def active_tasks(
- user_id: str = Query(None, description="用户ID(可选,不提供则返回所有任务)")
- ):
- """
- 获取活跃的大纲生成任务列表
- Args:
- user_id: 用户ID(可选)
- Returns:
- dict: 活跃任务列表
- """
- try:
- logger.info(f"获取活跃任务列表: user_id={user_id}")
- # 获取所有活跃任务
- active_tasks_list = await workflow_manager.get_outline_active_tasks()
- # 如果指定了用户ID,则过滤
- if user_id:
- active_tasks_list = [task for task in active_tasks_list if task["user_id"] == user_id]
- return {
- "code": 200,
- "message": "查询成功",
- "data": {
- "total": len(active_tasks_list),
- "tasks": active_tasks_list
- }
- }
- except Exception as e:
- logger.error(f"获取活跃任务列表失败: {str(e)}", exc_info=True)
- raise HTTPException(status_code=500, detail=f"获取活跃任务列表失败: {str(e)}")
|