""" 施工方案审查启动接口 接收审查配置参数,启动AI审查工作流 """ import uuid import time import json import asyncio import traceback from datetime import datetime from typing import List, Optional, Dict, Any from pydantic import BaseModel, Field, validator from fastapi import APIRouter, HTTPException, Query from fastapi.responses import StreamingResponse from core.base.redis_duplicate_checker import RedisDuplicateChecker from foundation.logger.loggering import server_logger as logger from foundation.trace.trace_context import TraceContext, auto_trace from foundation.utils.redis_utils import get_file_info,store_file_info from core.base.workflow_manager import WorkflowManager from core.base.progress_manager import ProgressManager, sse_callback_manager from views.construction_review.file_upload import validate_upload_parameters from .schemas.error_schemas import LaunchReviewErrors launch_review_router = APIRouter(prefix="/sgsc", tags=["审查启动"]) duplicatechecker = RedisDuplicateChecker() # 初始化工作流管理器 workflow_manager = WorkflowManager( max_concurrent_docs=3, max_concurrent_reviews=5 ) # 初始化进度管理器 progress_manager = ProgressManager() async def sse_progress_callback(callback_task_id: str, current_data: dict): """SSE推送回调函数 - 接收进度更新并推送到客户端""" await sse_manager.send_progress(callback_task_id, current_data) class SimpleSSEManager: """SSE连接管理器 - 管理客户端SSE连接和消息推送""" def __init__(self): self.connections: Dict[str, asyncio.Queue] = {} async def connect(self, callback_task_id: str): """建立SSE连接 - 创建消息队列并发送连接确认""" queue = asyncio.Queue() self.connections[callback_task_id] = queue await queue.put({ "type": "connection_established", "callback_task_id": callback_task_id, "timestamp": datetime.now().isoformat() }) logger.info(f"SSE连接: {callback_task_id}") return queue async def disconnect(self, callback_task_id: str): """断开SSE连接 - 清理连接队列""" if callback_task_id in self.connections: del self.connections[callback_task_id] logger.info(f"SSE连接已断开: {callback_task_id}") async def send_progress(self, callback_task_id: str, current_data: dict): """发送进度更新 - 将进度数据放入队列推送给客户端""" queue = self.connections.get(callback_task_id) if queue: # 优先使用progress_manager传递的event_type,如果没有则使用默认逻辑 event_type = current_data.get("event_type", "processing") # 处理特殊的单元审查事件 if event_type == "unit_review" or (event_type == "processing" and current_data.get("status") == "unit_review_update"): event_type = "unit_review_update" await queue.put({ "type": event_type, "data": current_data, "timestamp": datetime.now().isoformat() }) logger.debug(f"SSE进度已推送: {callback_task_id}, 事件类型: {event_type}") else: logger.warning(f"SSE连接已断开,跳过进度推送: {callback_task_id} - AI审查任务继续执行") sse_manager = SimpleSSEManager() def format_sse_event(event_type: str, data: str) -> str: """格式化SSE事件 - 按照SSE协议格式化事件数据""" lines = [ f"event: {event_type}", f"data: {data}", "", "" ] return "\n".join(lines) + "\n" class LaunchReviewRequest(BaseModel): """启动审查请求模型""" callback_task_id: str = Field(..., description="回调任务ID,从文件上传接口获取") user_id: str = Field(..., description="用户标识") tendency_review_role: str = Field( ..., description="倾向性审查角色,暂定为 default_role" ) review_config: List[str] = Field( ..., description="审查配置列表,包含的项为启用状态" ) project_plan_type: str = Field( "bridge_up_part", description="工程方案类型,当前仅支持 bridge_up_part" ) class Config: extra = "forbid" # 禁止额外的字段 class LaunchReviewResponse(BaseModel): """启动审查响应模型""" code: int data: dict def validate_review_config(review_config: List[str]) -> None: """验证审查配置参数""" # 检查review_config是否为空 if not review_config or len(review_config) == 0: raise LaunchReviewErrors.enum_type_cannot_be_null() # 支持的审查项枚举值 supported_review_items = { 'sensitive_word_check', # 词句语法检查 'semantic_logic_check', # 语义逻辑审查 'completeness_check', # 条文完整性审查 'timeliness_check', # 时效性审查 'reference_check', # 规范性审查 'sensitive_words_check', # 敏感词审查 'mandatory_standards_check', # 强制性标准检查 'technical_parameters_check', # 技术参数精确检查 'design_values_check' # 设计值符合性检查 } # 检查是否包含不支持的审查项 unsupported_items = set(review_config) - supported_review_items if unsupported_items: raise LaunchReviewErrors.enum_type_invalid() def validate_project_plan_type(project_plan_type: str) -> None: """验证工程方案类型""" # 当前支持的工程方案类型 supported_types = {'bridge_up_part'} # 桥梁上部结构 if project_plan_type not in supported_types: raise LaunchReviewErrors.project_plan_type_invalid() def validate_tendency_review_role(tendency_review_role: str) -> None: """验证倾向性审查角色""" # 当前支持的倾向性审查角色类型 supported_roles = { 'default_role', # 默认角色 } if tendency_review_role not in supported_roles: raise LaunchReviewErrors.tendency_review_role_invalid() def validate_user_id(user_id: str) -> None: """验证用户标识""" # 当前支持的用户标识列表 supported_users = { 'user-001' } if user_id not in supported_users: raise LaunchReviewErrors.invalid_user() @launch_review_router.post("/sse/launch_review") @auto_trace(generate_if_missing=True) async def launch_review_sse(request_data: LaunchReviewRequest): """ 启动施工方案审查并返回SSE进度流 Args: request_data: 启动审查请求参数 Returns: StreamingResponse: SSE事件流,包含任务启动状态和进度 """ callback_task_id = request_data.callback_task_id TraceContext.set_trace_id(callback_task_id) user_id = request_data.user_id review_config = request_data.review_config project_plan_type = request_data.project_plan_type tendency_review_role = request_data.tendency_review_role logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}, user_id={user_id}, tendency_review_role={tendency_review_role}") # 验证用户标识 validate_user_id(user_id) # 验证审查配置 validate_review_config(review_config) # 验证工程方案类型 validate_project_plan_type(project_plan_type) # 验证倾向性审查角色 validate_tendency_review_role(tendency_review_role) # 注册SSE回调 sse_callback_manager.register_callback(callback_task_id, sse_progress_callback) queue = await sse_manager.connect(callback_task_id) async def generate_launch_review_events(): """生成启动审查SSE事件流""" try: # 发送连接确认 connected_data = json.dumps({ "callback_task_id": callback_task_id, "user_id": user_id, "current": 0, "stage_name": "启动审查SSE连接", "status": "connected", "message": "启动审查SSE连接已建立,正在处理请求...", "overall_task_status": "processing", "updated_at": int(time.time()), "issues": [] }, ensure_ascii=False) yield format_sse_event("connected", connected_data) # 处理启动审查逻辑 try: # 从callback_task_id中提取file_id (格式: file_id-timestamp) file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id logger.info(f"处理文件: {file_id}") # 发送处理状态 status_data = json.dumps({ "callback_task_id": callback_task_id, "user_id": user_id, "current": 5, "stage_name": f"验证文件信息: {file_id}", "status": "processing", "message": f"正在验证文件信息: {file_id}", "overall_task_status": "processing", "updated_at": int(time.time()), "issues": [] }, ensure_ascii=False) yield format_sse_event("processing", status_data) # 验证任务ID是否存在且未过期 if not await duplicatechecker.is_valid_task_id(callback_task_id): raise LaunchReviewErrors.task_not_found_or_expired() # 检查任务是否已经被使用启动审查 if await duplicatechecker.is_task_already_used(callback_task_id): raise LaunchReviewErrors.task_already_exists() # 标记任务为已使用 await duplicatechecker.mark_task_as_used(callback_task_id) file_info = await get_file_info(file_id, include_content=True) if not file_info: logger.error(f"文件信息获取失败: {file_id}") raise LaunchReviewErrors.file_info_not_found() # 立即更新Redis中的callback_task_id为当前值 try: await store_file_info(file_id, {'callback_task_id': callback_task_id}) logger.info(f"已更新Redis中的callback_task_id: {callback_task_id}") except Exception as e: logger.warning(f"更新Redis中的callback_task_id失败: {e}") # 添加审查配置到文件信息,并确保使用当前正确的callback_task_id file_info.update({ 'user_id': user_id, 'review_config': review_config, 'project_plan_type': project_plan_type, 'tendency_review_role': tendency_review_role, 'launched_at': int(time.time()), 'callback_task_id': callback_task_id # 确保使用当前正确的callback_task_id }) # 提交处理任务到工作流管理器 await workflow_manager.submit_task_processing(file_info) # 发送成功启动状态 success_data = json.dumps({ "callback_task_id": callback_task_id, "user_id": user_id, "current": 10, "stage_name": "任务启动成功", "status": "submitted", "message": "施工方案审查任务启动成功,请耐心等待结果...", "overall_task_status": "processing", "updated_at": int(time.time()), "issues": [] }, ensure_ascii=False) yield format_sse_event("submitted", success_data) # 继续监听工作流进度 logger.info(f"开始监听工作流进度: {callback_task_id}") while True: try: message = await queue.get() # 处理所有类型的进度更新消息 message_type = message.get("type") current_data = message.get("data") if current_data: # 根据消息类型决定数据格式 if message_type == "unit_review_update": # 单元审查更新的特殊格式 unified_data = { "callback_task_id": callback_task_id, "user_id": user_id, "current": current_data.get("current", 0), "stage_name": current_data.get("stage_name", "单元审查"), "status": "unit_review_update", "message": current_data.get("message", ""), "overall_task_status": current_data.get("overall_task_status", "processing"), "updated_at": current_data.get("updated_at", int(time.time())), "issues": current_data.get("issues", []) } else: # 通用进度更新格式(包括 processing_flag, processing, completed 等) unified_data = { "callback_task_id": callback_task_id, "user_id": user_id, "current": current_data.get("current", 0), "stage_name": current_data.get("stage_name", "处理中"), "status": current_data.get("status", "processing"), "message": current_data.get("message", ""), "overall_task_status": current_data.get("overall_task_status", "processing"), "updated_at": current_data.get("updated_at", int(time.time())), "issues": current_data.get("issues", []) } # 使用从progress_manager传递的事件类型,或回退到消息类型 sse_event_type = current_data.get("event_type", message_type) if not sse_event_type: sse_event_type = "processing" # 最终回退 logger.debug(f"生成SSE事件: {sse_event_type}, 消息类型: {message_type}, current: {current_data.get('current')}") unified_data_json = json.dumps(unified_data, ensure_ascii=False) yield format_sse_event(sse_event_type, unified_data_json) # 检查SSE回调是否已被注销(作为任务结束信号) if not sse_callback_manager.is_callback_registered(callback_task_id): logger.info(f"检测到SSE回调已注销,任务结束: {callback_task_id}") # 推送最终的completed事件 completion_data = { "callback_task_id": callback_task_id, "user_id": user_id, "current": 100, "stage_name": "审查完成", "status": "completed", "message": "施工审查方案处理完成!", "overall_task_status": "completed", "updated_at": current_data.get("updated_at", int(time.time())) if current_data else int(time.time()), } completion_json = json.dumps(completion_data, ensure_ascii=False) yield format_sse_event("completed", completion_json) break except Exception as e: logger.error(f"队列消息处理异常: {callback_task_id}") logger.error(f"异常详情: {str(e)}") logger.error(f"异常堆栈: {traceback.format_exc()}") break except HTTPException as e: logger.error(f"HTTP异常: {callback_task_id}") logger.error(f"异常详情: {str(e)}") logger.error(f"异常堆栈: {traceback.format_exc()}") error_data = json.dumps({ "callback_task_id": callback_task_id, "user_id": user_id, "current": 0, "stage_name": "处理异常", "status": "error", "message": e.detail.get("message") if hasattr(e, 'detail') and e.detail else str(e), "overall_task_status": "failed", "updated_at": int(time.time()), "issues": [], "error": e.detail.get("code") if hasattr(e, 'detail') and e.detail else "http_error" }, ensure_ascii=False) yield format_sse_event("error", error_data) except Exception as e: logger.error(f"启动审查处理异常: {callback_task_id}") logger.error(f"异常详情: {str(e)}") logger.error(f"异常堆栈: {traceback.format_exc()}") error_data = json.dumps({ "callback_task_id": callback_task_id, "user_id": user_id, "current": 0, "stage_name": "内部错误", "status": "error", "message": f"服务端内部错误: {str(e)}", "overall_task_status": "failed", "updated_at": int(time.time()), "issues": [], "error": "internal_error" }, ensure_ascii=False) yield format_sse_event("error", error_data) except Exception as e: logger.error(f"启动审查SSE事件流异常: {callback_task_id}") logger.error(f"异常详情: {str(e)}") logger.error(f"异常堆栈: {traceback.format_exc()}") error_data = json.dumps({ "callback_task_id": callback_task_id, "user_id": user_id if 'user_id' in locals() else "unknown", "current": 0, "stage_name": "SSE流异常", "status": "error", "message": f"SSE流异常: {str(e)}", "overall_task_status": "failed", "updated_at": int(time.time()), "issues": [], "error": "sse_error" }, ensure_ascii=False) yield format_sse_event("error", error_data) finally: # 清理回调连接 sse_callback_manager.unregister_callback(callback_task_id) await sse_manager.disconnect(callback_task_id) logger.debug(f"启动审查SSE流已结束: {callback_task_id}") return StreamingResponse( generate_launch_review_events(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "Cache-Control, EventSource, Content-Type", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "X-Accel-Buffering": "no", "X-Content-Type-Options": "nosniff" } )