| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- """
- 施工方案审查启动接口
- 接收审查配置参数,启动AI审查工作流
- """
- import uuid
- import time
- import json
- import asyncio
- import traceback
- from datetime import datetime
- from typing import List, Optional, Dict, Any
- from pydantic import BaseModel, Field
- from fastapi import APIRouter, HTTPException, Query
- from fastapi.responses import StreamingResponse
- from core.base.redis_duplicate_checker import RedisDuplicateChecker
- from foundation.logger.loggering import server_logger as logger
- from foundation.trace.trace_context import TraceContext, auto_trace
- from foundation.utils.redis_utils import get_file_info, delete_file_info
- from core.base.workflow_manager import WorkflowManager
- from core.base.progress_manager import ProgressManager, sse_callback_manager
- from views.construction_review.file_upload import validate_upload_parameters
- from .schemas.error_schemas import LaunchReviewErrors
- launch_review_router = APIRouter(prefix="/sgsc", tags=["审查启动"])
- duplicatechecker = RedisDuplicateChecker()
- # 初始化工作流管理器
- workflow_manager = WorkflowManager(
- max_concurrent_docs=3,
- max_concurrent_reviews=5
- )
- # 初始化进度管理器
- progress_manager = ProgressManager()
- async def sse_progress_callback(callback_task_id: str, current_data: dict):
- """SSE推送回调函数 - 接收进度更新并推送到客户端"""
- await sse_manager.send_progress(callback_task_id, current_data)
- class SimpleSSEManager:
- """SSE连接管理器 - 管理客户端SSE连接和消息推送"""
- def __init__(self):
- self.connections: Dict[str, asyncio.Queue] = {}
- async def connect(self, callback_task_id: str):
- """建立SSE连接 - 创建消息队列并发送连接确认"""
- queue = asyncio.Queue()
- self.connections[callback_task_id] = queue
- await queue.put({
- "type": "connection_established",
- "callback_task_id": callback_task_id,
- "timestamp": datetime.now().isoformat()
- })
- logger.info(f"SSE连接: {callback_task_id}")
- return queue
- async def disconnect(self, callback_task_id: str):
- """断开SSE连接 - 清理连接队列"""
- if callback_task_id in self.connections:
- del self.connections[callback_task_id]
- logger.info(f"SSE连接已断开: {callback_task_id}")
- async def send_progress(self, callback_task_id: str, current_data: dict):
- """发送进度更新 - 将进度数据放入队列推送给客户端"""
- queue = self.connections.get(callback_task_id)
- if queue:
- await queue.put({
- "type": "progress_update",
- "data": current_data,
- "timestamp": datetime.now().isoformat()
- })
- logger.debug(f"SSE进度已推送: {callback_task_id}")
- sse_manager = SimpleSSEManager()
- def format_sse_event(event_type: str, data: str) -> str:
- """格式化SSE事件 - 按照SSE协议格式化事件数据"""
- lines = [
- f"event: {event_type}",
- f"data: {data}",
- "",
- ""
- ]
- return "\n".join(lines) + "\n"
- class LaunchReviewRequest(BaseModel):
- """启动审查请求模型"""
- callback_task_id: str = Field(..., description="回调任务ID,从文件上传接口获取")
- review_config: List[str] = Field(
- ...,
- description="审查配置列表,包含的项为启用状态"
- )
- project_plan_type: str = Field(
- "bridge_up_part",
- description="工程方案类型,当前仅支持 bridge_up_part"
- )
- class Config:
- extra = "forbid" # 禁止额外的字段
- class LaunchReviewResponse(BaseModel):
- """启动审查响应模型"""
- code: int
- data: dict
- def validate_review_config(review_config: List[str]) -> None:
- """验证审查配置参数"""
- # 检查review_config是否为空
- if not review_config or len(review_config) == 0:
- raise LaunchReviewErrors.enum_type_cannot_be_null()
- # 支持的审查项枚举值
- supported_review_items = {
- 'sensitive_word_check', # 词句语法检查
- 'semantic_logic_check', # 语义逻辑审查
- 'completeness_check', # 条文完整性审查
- 'timeliness_check', # 时效性审查
- 'reference_check', # 规范性审查
- 'sensitive_words_check', # 敏感词审查
- 'mandatory_standards_check', # 强制性标准检查
- 'technical_parameters_check', # 技术参数精确检查
- 'design_values_check' # 设计值符合性检查
- }
- # 检查是否包含不支持的审查项
- unsupported_items = set(review_config) - supported_review_items
- if unsupported_items:
- raise LaunchReviewErrors.enum_type_invalid()
- def validate_project_plan_type(project_plan_type: str) -> None:
- """验证工程方案类型"""
- # 当前支持的工程方案类型
- supported_types = {'bridge_up_part'} # 桥梁上部结构
- if project_plan_type not in supported_types:
- raise LaunchReviewErrors.project_plan_type_invalid()
- @launch_review_router.post("/sse/launch_review")
- @auto_trace(generate_if_missing=True)
- async def launch_review_sse(request_data: LaunchReviewRequest):
- """
- 启动施工方案审查并返回SSE进度流
- Args:
- request_data: 启动审查请求参数
- Returns:
- StreamingResponse: SSE事件流,包含任务启动状态和进度
- """
- callback_task_id = request_data.callback_task_id
- TraceContext.set_trace_id(callback_task_id)
- review_config = request_data.review_config
- project_plan_type = request_data.project_plan_type
- logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}")
- # 验证审查配置
- validate_review_config(review_config)
- # 验证工程方案类型
- validate_project_plan_type(project_plan_type)
- # 注册SSE回调
- sse_callback_manager.register_callback(callback_task_id, sse_progress_callback)
- queue = await sse_manager.connect(callback_task_id)
- async def generate_launch_review_events():
- """生成启动审查SSE事件流"""
- try:
- # 发送连接确认
- connected_data = json.dumps({
- "callback_task_id": callback_task_id,
- "message": "启动审查SSE连接已建立,正在处理请求...",
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("connected", connected_data)
- # 处理启动审查逻辑
- try:
- from foundation.utils.redis_utils import get_file_info
- # 从callback_task_id中提取file_id (格式: file_id-timestamp)
- file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id
- # 发送处理状态
- status_data = json.dumps({
- "callback_task_id": callback_task_id,
- "stage": "validation",
- "message": f"正在验证文件信息: {file_id}",
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("processing", status_data)
- # 检查重复任务
- if await duplicatechecker.is_duplicate_task(file_id):
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "error": "task_already_exists",
- "message": "任务已存在,请勿重复提交",
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- return
- # 获取文件信息
- status_data = json.dumps({
- "callback_task_id": callback_task_id,
- "stage": "loading",
- "message": "正在加载文件信息...",
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("processing", status_data)
- file_info = await get_file_info(file_id, include_content=True)
- if not file_info:
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "error": "task_not_found",
- "message": "任务ID不存在或已过期",
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- return
- # 验证必要的字段
- if 'file_content' not in file_info:
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "error": "missing_content",
- "message": "文件内容缺失",
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- return
- # 添加审查配置到文件信息
- file_info.update({
- 'review_config': review_config,
- 'project_plan_type': project_plan_type,
- 'launched_at': int(time.time())
- })
- # 发送提交任务状态
- status_data = json.dumps({
- "callback_task_id": callback_task_id,
- "stage": "submitting",
- "message": "正在提交AI审查任务...",
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("processing", status_data)
- # 提交处理任务到工作流管理器
- task_id = await workflow_manager.submit_task_processing(file_info)
- # 发送成功启动状态
- success_data = json.dumps({
- "callback_task_id": callback_task_id,
- "task_id": task_id,
- "file_id": file_info['file_id'],
- "review_config": review_config,
- "project_plan_type": project_plan_type,
- "status": "submitted",
- "submitted_at": file_info['launched_at'],
- "message": "AI审查任务已成功启动",
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("submitted", success_data)
- # 继续监听工作流进度
- logger.info(f"开始监听工作流进度: {callback_task_id}")
- while True:
- try:
- message = await queue.get()
- if message.get("type") == "progress_update":
- current_data = message.get("data")
- if current_data:
- progress_json = json.dumps(current_data, ensure_ascii=False)
- yield format_sse_event("progress", progress_json)
- overall_task_status = current_data.get("overall_task_status")
- if overall_task_status in ["completed", "failed"]:
- completion_data = {
- "callback_task_id": callback_task_id,
- "task_status": overall_task_status,
- "overall_progress": current_data.get("current", 100),
- "timestamp": datetime.now().isoformat(),
- "message": "审查任务处理完成!"
- }
- completion_json = json.dumps(completion_data, ensure_ascii=False)
- yield format_sse_event("completed", completion_json)
- break
- except Exception as e:
- logger.error(f"队列消息处理异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- break
- except HTTPException as e:
- logger.error(f"HTTP异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "error": e.detail.get("code") if hasattr(e, 'detail') and e.detail else "http_error",
- "message": e.detail.get("message") if hasattr(e, 'detail') and e.detail else str(e),
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- except Exception as e:
- logger.error(f"启动审查处理异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "error": "internal_error",
- "message": f"服务端内部错误: {str(e)}",
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- except Exception as e:
- logger.error(f"启动审查SSE事件流异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "error": "sse_error",
- "message": f"SSE流异常: {str(e)}",
- "timestamp": datetime.now().isoformat()
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- finally:
- # 清理回调连接
- sse_callback_manager.unregister_callback(callback_task_id)
- await sse_manager.disconnect(callback_task_id)
- logger.debug(f"启动审查SSE流已结束: {callback_task_id}")
- return StreamingResponse(
- generate_launch_review_events(),
- media_type="text/event-stream",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Connection": "keep-alive",
- "Access-Control-Allow-Origin": "*",
- "Access-Control-Allow-Headers": "Cache-Control, EventSource, Content-Type",
- "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
- "X-Accel-Buffering": "no",
- "X-Content-Type-Options": "nosniff"
- }
- )
- @launch_review_router.get("/sse/launch_review_status")
- async def get_launch_review_sse_status():
- """获取启动审查SSE连接状态 - 返回当前活跃的启动审查SSE连接信息"""
- return {
- "active_connections": len(sse_manager.connections),
- "connections": list(sse_manager.connections.keys()),
- "timestamp": datetime.now().isoformat(),
- "service": "launch_review_sse"
- }
|