| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- """
- 施工方案审查启动接口
- 接收审查配置参数,启动AI审查工作流
- """
- import uuid
- import time
- import json
- import asyncio
- import traceback
- from datetime import datetime
- from typing import List, Optional, Dict, Any
- from pydantic import BaseModel, Field, validator
- from fastapi import APIRouter, HTTPException, Query
- from fastapi.responses import StreamingResponse
- from core.base.redis_duplicate_checker import RedisDuplicateChecker
- from foundation.logger.loggering import server_logger as logger
- from foundation.trace.trace_context import TraceContext, auto_trace
- from foundation.utils.redis_utils import get_file_info,store_file_info
- from core.base.workflow_manager import WorkflowManager
- from core.base.progress_manager import ProgressManager, sse_callback_manager
- from core.base.sse_manager import unified_sse_manager
- from views.construction_review.file_upload import validate_upload_parameters
- from .schemas.error_schemas import LaunchReviewErrors
- launch_review_router = APIRouter(prefix="/sgsc", tags=["审查启动"])
- duplicatechecker = RedisDuplicateChecker()
- # 初始化工作流管理器
- workflow_manager = WorkflowManager(
- max_concurrent_docs=3,
- max_concurrent_reviews=5
- )
- # 初始化进度管理器
- progress_manager = ProgressManager()
- async def sse_progress_callback(callback_task_id: str, current_data: dict):
- """SSE推送回调函数 - 接收进度更新并推送到客户端"""
- await unified_sse_manager.send_progress(callback_task_id, current_data)
- class SimpleSSEManager:
- """
- SSE连接管理器 - 兼容性包装器,委托给统一SSE管理器
- 注意: 此类保持向后兼容,建议直接使用 unified_sse_manager
- """
- async def connect(self, callback_task_id: str, callback_func=None):
- """建立SSE连接"""
- return await unified_sse_manager.establish_connection(callback_task_id, callback_func)
- async def disconnect(self, callback_task_id: str):
- """断开SSE连接"""
- await unified_sse_manager.close_connection(callback_task_id)
- async def send_progress(self, callback_task_id: str, current_data: dict):
- """发送进度消息"""
- await unified_sse_manager.send_progress(callback_task_id, current_data)
- # 创建兼容性实例
- sse_manager = SimpleSSEManager()
- def format_sse_event(event_type: str, data: str) -> str:
- """格式化SSE事件 - 按照SSE协议格式化事件数据"""
- lines = [
- f"event: {event_type}",
- f"data: {data}",
- "",
- ""
- ]
- return "\n".join(lines) + "\n"
- class LaunchReviewRequest(BaseModel):
- """启动审查请求模型"""
- callback_task_id: str = Field(..., description="回调任务ID,从文件上传接口获取")
- user_id: str = Field(..., description="用户标识")
- tendency_review_role: str = Field(
- ...,
- description="倾向性审查角色,暂定为 default_role"
- )
- review_config: List[str] = Field(
- ...,
- description="审查配置列表,包含的项为启用状态"
- )
- project_plan_type: str = Field(
- "bridge_up_part",
- description="工程方案类型,当前仅支持 bridge_up_part"
- )
- class Config:
- extra = "forbid" # 禁止额外的字段
- class LaunchReviewResponse(BaseModel):
- """启动审查响应模型"""
- code: int
- data: dict
- def validate_review_config(review_config: List[str]) -> None:
- """验证审查配置参数"""
- # 检查review_config是否为空
- if not review_config or len(review_config) == 0:
- raise LaunchReviewErrors.enum_type_cannot_be_null()
- # 支持的审查项枚举值
- supported_review_items = {
- 'sensitive_word_check', # 词句语法检查
- 'semantic_logic_check', # 语义逻辑审查
- 'completeness_check', # 条文完整性审查
- 'timeliness_check', # 时效性审查
- 'reference_check', # 规范性审查
- 'sensitive_words_check', # 敏感词审查
- 'mandatory_standards_check', # 强制性标准检查
- 'technical_parameters_check', # 技术参数精确检查
- 'design_values_check' # 设计值符合性检查
- }
- # 检查是否包含不支持的审查项
- unsupported_items = set(review_config) - supported_review_items
- if unsupported_items:
- raise LaunchReviewErrors.enum_type_invalid()
- def validate_project_plan_type(project_plan_type: str) -> None:
- """验证工程方案类型"""
- # 当前支持的工程方案类型
- supported_types = {'bridge_up_part'} # 桥梁上部结构
- if project_plan_type not in supported_types:
- raise LaunchReviewErrors.project_plan_type_invalid()
- def validate_tendency_review_role(tendency_review_role: str) -> None:
- """验证倾向性审查角色"""
- # 当前支持的倾向性审查角色类型
- supported_roles = {
- 'default_role', # 默认角色
- }
- if tendency_review_role not in supported_roles:
- raise LaunchReviewErrors.tendency_review_role_invalid()
- def validate_user_id(user_id: str) -> None:
- """验证用户标识"""
- # 当前支持的用户标识列表
- supported_users = {
- 'user-001'
- }
- if user_id not in supported_users:
- raise LaunchReviewErrors.invalid_user()
- @launch_review_router.post("/sse/launch_review")
- @auto_trace(generate_if_missing=True)
- async def launch_review_sse(request_data: LaunchReviewRequest):
- """
- 启动施工方案审查并返回SSE进度流
- Args:
- request_data: 启动审查请求参数
- Returns:
- StreamingResponse: SSE事件流,包含任务启动状态和进度
- """
- callback_task_id = request_data.callback_task_id
- TraceContext.set_trace_id(callback_task_id)
- user_id = request_data.user_id
- review_config = request_data.review_config
- project_plan_type = request_data.project_plan_type
- tendency_review_role = request_data.tendency_review_role
- logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}, user_id={user_id}, tendency_review_role={tendency_review_role}")
- # 验证用户标识
- validate_user_id(user_id)
- # 验证审查配置
- validate_review_config(review_config)
- # 验证工程方案类型
- validate_project_plan_type(project_plan_type)
- # 验证倾向性审查角色
- validate_tendency_review_role(tendency_review_role)
- # 使用统一SSE管理器建立连接并注册回调
- queue = await unified_sse_manager.establish_connection(callback_task_id, sse_progress_callback)
- async def generate_launch_review_events():
- """生成启动审查SSE事件流"""
- try:
- # 发送连接确认
- connected_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "启动审查SSE连接",
- "status": "connected",
- "message": "启动审查SSE连接已建立,正在处理请求...",
- "overall_task_status": "processing",
- "updated_at": int(time.time()),
- "issues": []
- }, ensure_ascii=False)
- yield format_sse_event("connected", connected_data)
- # 处理启动审查逻辑
- try:
- # 从callback_task_id中提取file_id (格式: file_id-timestamp)
- file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id
- logger.info(f"处理文件: {file_id}")
- # 发送处理状态
- status_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 5,
- "stage_name": f"验证文件信息: {file_id}",
- "status": "processing",
- "message": f"正在验证文件信息: {file_id}",
- "overall_task_status": "processing",
- "updated_at": int(time.time()),
- "issues": []
- }, ensure_ascii=False)
- yield format_sse_event("processing", status_data)
- # 验证任务ID是否存在且未过期
- if not await duplicatechecker.is_valid_task_id(callback_task_id):
- raise LaunchReviewErrors.task_not_found_or_expired()
- # 检查任务是否已经被使用启动审查
- if await duplicatechecker.is_task_already_used(callback_task_id):
- raise LaunchReviewErrors.task_already_exists()
- # 标记任务为已使用
- await duplicatechecker.mark_task_as_used(callback_task_id)
- file_info = await get_file_info(file_id, include_content=True)
- if not file_info:
- logger.error(f"文件信息获取失败: {file_id}")
- raise LaunchReviewErrors.file_info_not_found()
- # 立即更新Redis中的callback_task_id为当前值
- try:
- await store_file_info(file_id, {'callback_task_id': callback_task_id})
- logger.info(f"已更新Redis中的callback_task_id: {callback_task_id}")
- except Exception as e:
- logger.warning(f"更新Redis中的callback_task_id失败: {e}")
- # 添加审查配置到文件信息,并确保使用当前正确的callback_task_id
- file_info.update({
- 'user_id': user_id,
- 'review_config': review_config,
- 'project_plan_type': project_plan_type,
- 'tendency_review_role': tendency_review_role,
- 'launched_at': int(time.time()),
- 'callback_task_id': callback_task_id # 确保使用当前正确的callback_task_id
- })
- # 提交处理任务到工作流管理器
- await workflow_manager.submit_task_processing(file_info)
- # 发送成功启动状态
- success_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 10,
- "stage_name": "任务启动成功",
- "status": "submitted",
- "message": "施工方案审查任务启动成功,请耐心等待结果...",
- "overall_task_status": "processing",
- "updated_at": int(time.time()),
- "issues": []
- }, ensure_ascii=False)
- yield format_sse_event("submitted", success_data)
- # 继续监听工作流进度
- logger.info(f"开始监听工作流进度: {callback_task_id}")
- while True:
- try:
- message = await queue.get()
- # 处理所有类型的进度更新消息
- message_type = message.get("type")
- current_data = message.get("data")
- if current_data:
- # 根据消息类型决定数据格式
- if message_type == "unit_review_update":
- # 单元审查更新的特殊格式
- unified_data = {
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": current_data.get("current", 0),
- "stage_name": current_data.get("stage_name", "单元审查"),
- "status": "unit_review_update",
- "message": current_data.get("message", ""),
- "overall_task_status": current_data.get("overall_task_status", "processing"),
- "updated_at": current_data.get("updated_at", int(time.time())),
- "issues": current_data.get("issues", [])
- }
- else:
- # 通用进度更新格式(包括 processing_flag, processing, completed 等)
- unified_data = {
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": current_data.get("current", 0),
- "stage_name": current_data.get("stage_name", "处理中"),
- "status": current_data.get("status", "processing"),
- "message": current_data.get("message", ""),
- "overall_task_status": current_data.get("overall_task_status", "processing"),
- "updated_at": current_data.get("updated_at", int(time.time())),
- "issues": current_data.get("issues", [])
- }
- # 使用从progress_manager传递的事件类型,或回退到消息类型
- sse_event_type = current_data.get("event_type", message_type)
- if not sse_event_type:
- sse_event_type = "processing" # 最终回退
- logger.debug(f"生成SSE事件: {sse_event_type}, 消息类型: {message_type}, current: {current_data.get('current')}")
- unified_data_json = json.dumps(unified_data, ensure_ascii=False)
- yield format_sse_event(sse_event_type, unified_data_json)
- # 调试日志:记录接收到的消息
- logger.debug(f"收到消息 - 类型: {message_type}, 数据: {current_data}")
- # 特殊处理:跳过连接建立消息,避免误判为完成
- if message_type == "connection_established":
- logger.info(f"收到连接建立消息,继续监听: {callback_task_id}")
- continue
- # 特殊处理:收到连接关闭信号,立即结束SSE流
- if message_type == "connection_closed":
- completion_data = {
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 100,
- "stage_name": "审查完成",
- "status": "completed",
- "message": f"施工审查方案处理完成!",
- "overall_task_status": "completed",
- "updated_at": current_data.get("updated_at", int(time.time())) if current_data else int(time.time()),
- }
- completion_json = json.dumps(completion_data, ensure_ascii=False)
- yield format_sse_event("completed", completion_json)
- logger.info(f"收到连接关闭信号,结束SSE流: {callback_task_id}")
- logger.info(f"SSE状态: SSE回调已注销")
- break
- except Exception as e:
- logger.error(f"队列消息处理异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- break
- except HTTPException as e:
- logger.error(f"HTTP异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "处理异常",
- "status": "error",
- "message": e.detail.get("message") if hasattr(e, 'detail') and e.detail else str(e),
- "overall_task_status": "failed",
- "updated_at": int(time.time()),
- "issues": [],
- "error": e.detail.get("code") if hasattr(e, 'detail') and e.detail else "http_error"
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- except Exception as e:
- logger.error(f"启动审查处理异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "内部错误",
- "status": "error",
- "message": f"服务端内部错误: {str(e)}",
- "overall_task_status": "failed",
- "updated_at": int(time.time()),
- "issues": [],
- "error": "internal_error"
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- except Exception as e:
- logger.error(f"启动审查SSE事件流异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id if 'user_id' in locals() else "unknown",
- "current": 0,
- "stage_name": "SSE流异常",
- "status": "error",
- "message": f"SSE流异常: {str(e)}",
- "overall_task_status": "failed",
- "updated_at": int(time.time()),
- "issues": [],
- "error": "sse_error"
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- finally:
- # 清理回调连接(确保资源被正确释放)
- try:
- sse_callback_manager.unregister_callback(callback_task_id)
- except Exception as cleanup_error:
- logger.warning(f"清理回调连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
- try:
- await unified_sse_manager.close_connection(callback_task_id)
- except Exception as cleanup_error:
- logger.warning(f"断开SSE连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
- logger.debug(f"启动审查SSE流已结束: {callback_task_id}")
- return StreamingResponse(
- generate_launch_review_events(),
- media_type="text/event-stream",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Connection": "keep-alive",
- "Access-Control-Allow-Origin": "*",
- "Access-Control-Allow-Headers": "Cache-Control, EventSource, Content-Type",
- "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
- "X-Accel-Buffering": "no",
- "X-Content-Type-Options": "nosniff"
- }
- )
|