""" 施工方案审查启动接口 接收审查配置参数,启动AI审查工作流 """ import uuid import time from datetime import datetime from typing import List, Optional, Dict, Any from pydantic import BaseModel, Field from fastapi import APIRouter, HTTPException from core.base.redis_duplicate_checker import RedisDuplicateChecker from foundation.logger.loggering import server_logger as logger from foundation.trace.trace_context import TraceContext, auto_trace from foundation.utils.redis_utils import get_file_info, delete_file_info from core.base.workflow_manager import WorkflowManager from views.construction_review.file_upload import validate_upload_parameters from .schemas.error_schemas import LaunchReviewErrors launch_review_router = APIRouter(prefix="/sgsc", tags=["审查启动"]) duplicatechecker = RedisDuplicateChecker() # 初始化工作流管理器 workflow_manager = WorkflowManager( max_concurrent_docs=3, max_concurrent_reviews=5 ) class LaunchReviewRequest(BaseModel): """启动审查请求模型""" callback_task_id: str = Field(..., description="回调任务ID,从文件上传接口获取") review_config: List[str] = Field( ..., description="审查配置列表,包含的项为启用状态" ) project_plan_type: str = Field( "bridge_up_part", description="工程方案类型,当前仅支持 bridge_up_part" ) class Config: extra = "forbid" # 禁止额外的字段 class LaunchReviewResponse(BaseModel): """启动审查响应模型""" code: int data: dict def validate_review_config(review_config: List[str]) -> None: """验证审查配置参数""" # 检查review_config是否为空 if not review_config or len(review_config) == 0: raise LaunchReviewErrors.enum_type_cannot_be_null() # 支持的审查项枚举值 supported_review_items = { 'sensitive_word_check', # 词句语法检查 'semantic_logic_check', # 语义逻辑审查 'completeness_check', # 条文完整性审查 'timeliness_check', # 时效性审查 'reference_check', # 规范性审查 'sensitive_words_check', # 敏感词审查 'mandatory_standards_check', # 强制性标准检查 'technical_parameters_check', # 技术参数精确检查 'design_values_check' # 设计值符合性检查 } # 检查是否包含不支持的审查项 unsupported_items = set(review_config) - supported_review_items if unsupported_items: raise LaunchReviewErrors.enum_type_invalid() def validate_project_plan_type(project_plan_type: str) -> None: """验证工程方案类型""" # 当前支持的工程方案类型 supported_types = {'bridge_up_part'} # 桥梁上部结构 if project_plan_type not in supported_types: raise LaunchReviewErrors.project_plan_type_invalid() @launch_review_router.post("/sse/launch_review", response_model=LaunchReviewResponse) @auto_trace(generate_if_missing=True) async def launch_review(request_data: LaunchReviewRequest): """ 启动施工方案审查 Args: request_data: 启动审查请求参数 Returns: LaunchReviewResponse: 包含任务ID的响应 """ try: callback_task_id = request_data.callback_task_id review_config = request_data.review_config project_plan_type = request_data.project_plan_type logger.info(f"收到审查启动请求: callback_task_id={callback_task_id}") # 验证审查配置 validate_review_config(review_config) # 验证工程方案类型 validate_project_plan_type(project_plan_type) try: # 从callback_task_id中提取file_id (格式: file_id-timestamp) file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id # 检查重复任务 if await duplicatechecker.is_duplicate_task(file_id): raise LaunchReviewErrors.task_already_exists() # 获取文件信息(确保包含文件内容) file_info = await get_file_info(file_id, include_content=True) if not file_info: raise LaunchReviewErrors.task_not_found() # 验证必要的字段是否存在 if 'file_content' not in file_info: logger.error(f"文件信息中缺少file_content字段,可用字段: {list(file_info.keys())}") raise LaunchReviewErrors.task_not_found() # 添加审查配置到文件信息 file_info.update({ 'review_config': review_config, 'project_plan_type': project_plan_type, 'launched_at': int(time.time()) }) logger.info(f"获取到文件信息: file_id={file_id}, 包含字段: {list(file_info.keys())}") logger.info(f"文件内容大小: {len(file_info.get('file_content', b''))} bytes") # 注意:暂不删除Redis缓存,让工作流处理完成后再清理 # await delete_file_info(file_id) logger.info(f"保留Redis缓存供工作流使用: file_info:{file_id}") except Exception as e: logger.error(f"获取文件信息失败: {str(e)}") raise LaunchReviewErrors.file_info_not_found(e) # 提交处理任务到工作流管理器 task_id = await workflow_manager.submit_task_processing(file_info) except HTTPException: # 重新抛出HTTP异常 raise except Exception as e: logger.error(f"启动审查失败: {str(e)}") raise LaunchReviewErrors.internal_error(e)