""" 施工方案审查启动接口 接收审查配置参数,启动AI审查工作流 """ import uuid import time import json import asyncio import traceback from datetime import datetime from typing import List, Optional, Dict, Any from pydantic import BaseModel, Field from fastapi import APIRouter, HTTPException, Query from fastapi.responses import StreamingResponse from core.base.redis_duplicate_checker import RedisDuplicateChecker from foundation.logger.loggering import server_logger as logger from foundation.trace.trace_context import TraceContext, auto_trace from foundation.utils.redis_utils import get_file_info from core.base.workflow_manager import WorkflowManager from core.base.progress_manager import ProgressManager, sse_callback_manager from views.construction_review.file_upload import validate_upload_parameters from .schemas.error_schemas import LaunchReviewErrors launch_review_router = APIRouter(prefix="/sgsc", tags=["审查启动"]) duplicatechecker = RedisDuplicateChecker() # 初始化工作流管理器 workflow_manager = WorkflowManager( max_concurrent_docs=3, max_concurrent_reviews=5 ) # 初始化进度管理器 progress_manager = ProgressManager() async def sse_progress_callback(callback_task_id: str, current_data: dict): """SSE推送回调函数 - 接收进度更新并推送到客户端""" await sse_manager.send_progress(callback_task_id, current_data) class SimpleSSEManager: """SSE连接管理器 - 管理客户端SSE连接和消息推送""" def __init__(self): self.connections: Dict[str, asyncio.Queue] = {} async def connect(self, callback_task_id: str): """建立SSE连接 - 创建消息队列并发送连接确认""" queue = asyncio.Queue() self.connections[callback_task_id] = queue await queue.put({ "type": "connection_established", "callback_task_id": callback_task_id, "timestamp": datetime.now().isoformat() }) logger.info(f"SSE连接: {callback_task_id}") return queue async def disconnect(self, callback_task_id: str): """断开SSE连接 - 清理连接队列""" if callback_task_id in self.connections: del self.connections[callback_task_id] logger.info(f"SSE连接已断开: {callback_task_id}") async def send_progress(self, callback_task_id: str, current_data: dict): """发送进度更新 - 将进度数据放入队列推送给客户端""" queue = self.connections.get(callback_task_id) if queue: # 根据数据状态决定事件类型 event_type = "unit_review_update" if current_data.get("status") == "unit_review_update" else "progress_update" await queue.put({ "type": event_type, "data": current_data, "timestamp": datetime.now().isoformat() }) logger.debug(f"SSE进度已推送: {callback_task_id}, 事件类型: {event_type}") else: logger.warning(f"SSE连接已断开,跳过进度推送: {callback_task_id} - AI审查任务继续执行") sse_manager = SimpleSSEManager() def format_sse_event(event_type: str, data: str) -> str: """格式化SSE事件 - 按照SSE协议格式化事件数据""" lines = [ f"event: {event_type}", f"data: {data}", "", "" ] return "\n".join(lines) + "\n" class LaunchReviewRequest(BaseModel): """启动审查请求模型""" callback_task_id: str = Field(..., description="回调任务ID,从文件上传接口获取") user_id: str = Field(..., description="用户标识") review_config: List[str] = Field( ..., description="审查配置列表,包含的项为启用状态" ) project_plan_type: str = Field( "bridge_up_part", description="工程方案类型,当前仅支持 bridge_up_part" ) class Config: extra = "forbid" # 禁止额外的字段 class LaunchReviewResponse(BaseModel): """启动审查响应模型""" code: int data: dict def validate_review_config(review_config: List[str]) -> None: """验证审查配置参数""" # 检查review_config是否为空 if not review_config or len(review_config) == 0: raise LaunchReviewErrors.enum_type_cannot_be_null() # 支持的审查项枚举值 supported_review_items = { 'sensitive_word_check', # 词句语法检查 'semantic_logic_check', # 语义逻辑审查 'completeness_check', # 条文完整性审查 'timeliness_check', # 时效性审查 'reference_check', # 规范性审查 'sensitive_words_check', # 敏感词审查 'mandatory_standards_check', # 强制性标准检查 'technical_parameters_check', # 技术参数精确检查 'design_values_check' # 设计值符合性检查 } # 检查是否包含不支持的审查项 unsupported_items = set(review_config) - supported_review_items if unsupported_items: raise LaunchReviewErrors.enum_type_invalid() def validate_project_plan_type(project_plan_type: str) -> None: """验证工程方案类型""" # 当前支持的工程方案类型 supported_types = {'bridge_up_part'} # 桥梁上部结构 if project_plan_type not in supported_types: raise LaunchReviewErrors.project_plan_type_invalid() @launch_review_router.post("/sse/launch_review") @auto_trace(generate_if_missing=True) async def launch_review_sse(request_data: LaunchReviewRequest): """ 启动施工方案审查并返回SSE进度流 Args: request_data: 启动审查请求参数 Returns: StreamingResponse: SSE事件流,包含任务启动状态和进度 """ callback_task_id = request_data.callback_task_id TraceContext.set_trace_id(callback_task_id) review_config = request_data.review_config project_plan_type = request_data.project_plan_type logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}") # 验证审查配置 validate_review_config(review_config) # 验证工程方案类型 validate_project_plan_type(project_plan_type) # 注册SSE回调 sse_callback_manager.register_callback(callback_task_id, sse_progress_callback) queue = await sse_manager.connect(callback_task_id) async def generate_launch_review_events(): """生成启动审查SSE事件流""" try: # 发送连接确认 connected_data = json.dumps({ "callback_task_id": callback_task_id, "stage": "startup", "message": "启动审查SSE连接已建立,正在处理请求...", "timestamp": datetime.now().isoformat() }, ensure_ascii=False) yield format_sse_event("connected", connected_data) # 处理启动审查逻辑 try: # 从callback_task_id中提取file_id (格式: file_id-timestamp) file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id logger.info(f"处理文件: {file_id}") # 发送处理状态 status_data = json.dumps({ "callback_task_id": callback_task_id, "stage": "validation", "message": f"正在验证文件信息: {file_id}", "timestamp": datetime.now().isoformat() }, ensure_ascii=False) yield format_sse_event("processing", status_data) # 验证任务ID是否存在且未过期 if not await duplicatechecker.is_valid_task_id(callback_task_id): raise LaunchReviewErrors.task_not_found_or_expired() # 检查任务是否已经被使用启动审查 if await duplicatechecker.is_task_already_used(callback_task_id): raise LaunchReviewErrors.task_already_exists() # 标记任务为已使用 await duplicatechecker.mark_task_as_used(callback_task_id) file_info = await get_file_info(file_id, include_content=True) if not file_info: logger.error(f"文件信息获取失败: {file_id}") raise LaunchReviewErrors.file_info_not_found() # 立即更新Redis中的callback_task_id为当前值 try: from foundation.utils.redis_utils import store_file_info await store_file_info(file_id, {'callback_task_id': callback_task_id}) logger.info(f"已更新Redis中的callback_task_id: {callback_task_id}") except Exception as e: logger.warning(f"更新Redis中的callback_task_id失败: {e}") # 添加审查配置到文件信息,并确保使用当前正确的callback_task_id file_info.update({ 'review_config': review_config, 'project_plan_type': project_plan_type, 'launched_at': int(time.time()), 'callback_task_id': callback_task_id # 确保使用当前正确的callback_task_id }) # 提交处理任务到工作流管理器 await workflow_manager.submit_task_processing(file_info) # 发送成功启动状态 success_data = json.dumps({ "callback_task_id": callback_task_id, "file_id": file_info['file_id'], "review_config": review_config, "project_plan_type": project_plan_type, "status": "submitted", "submitted_at": file_info['launched_at'], "message": "施工方案审查任务启动成功,请耐心等待结果...", "timestamp": datetime.now().isoformat() }, ensure_ascii=False) yield format_sse_event("submitted", success_data) # 继续监听工作流进度 logger.info(f"开始监听工作流进度: {callback_task_id}") while True: try: message = await queue.get() if message.get("type") == "progress_update": current_data = message.get("data") if current_data: progress_json = json.dumps(current_data, ensure_ascii=False) yield format_sse_event("progress", progress_json) elif message.get("type") == "unit_review_update": current_data = message.get("data") if current_data: unit_review_json = json.dumps(current_data, ensure_ascii=False) yield format_sse_event("unit_review", unit_review_json) # # 统一检查任务完成状态 # if 'current_data' in locals() and current_data: # overall_task_status = current_data.get("overall_task_status") # if overall_task_status in ["completed", "failed"]: # completion_data = { # "callback_task_id": callback_task_id, # "task_status": overall_task_status, # "overall_progress": current_data.get("current", 100), # "timestamp": datetime.now().isoformat(), # "message": "审查任务处理完成!" # } # completion_json = json.dumps(completion_data, ensure_ascii=False) # yield format_sse_event("completed", completion_json) # break except Exception as e: logger.error(f"队列消息处理异常: {callback_task_id}") logger.error(f"异常详情: {str(e)}") logger.error(f"异常堆栈: {traceback.format_exc()}") break except HTTPException as e: logger.error(f"HTTP异常: {callback_task_id}") logger.error(f"异常详情: {str(e)}") logger.error(f"异常堆栈: {traceback.format_exc()}") error_data = json.dumps({ "callback_task_id": callback_task_id, "error": e.detail.get("code") if hasattr(e, 'detail') and e.detail else "http_error", "message": e.detail.get("message") if hasattr(e, 'detail') and e.detail else str(e), "timestamp": datetime.now().isoformat() }, ensure_ascii=False) yield format_sse_event("error", error_data) except Exception as e: logger.error(f"启动审查处理异常: {callback_task_id}") logger.error(f"异常详情: {str(e)}") logger.error(f"异常堆栈: {traceback.format_exc()}") error_data = json.dumps({ "callback_task_id": callback_task_id, "error": "internal_error", "message": f"服务端内部错误: {str(e)}", "timestamp": datetime.now().isoformat() }, ensure_ascii=False) yield format_sse_event("error", error_data) except Exception as e: logger.error(f"启动审查SSE事件流异常: {callback_task_id}") logger.error(f"异常详情: {str(e)}") logger.error(f"异常堆栈: {traceback.format_exc()}") error_data = json.dumps({ "callback_task_id": callback_task_id, "error": "sse_error", "message": f"SSE流异常: {str(e)}", "timestamp": datetime.now().isoformat() }, ensure_ascii=False) yield format_sse_event("error", error_data) finally: # 清理回调连接 sse_callback_manager.unregister_callback(callback_task_id) await sse_manager.disconnect(callback_task_id) logger.debug(f"启动审查SSE流已结束: {callback_task_id}") return StreamingResponse( generate_launch_review_events(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "Cache-Control, EventSource, Content-Type", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "X-Accel-Buffering": "no", "X-Content-Type-Options": "nosniff" } )