| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597 |
- """
- 施工方案审查启动接口
- 接收审查配置参数,启动AI审查工作流
- """
- import uuid
- import time
- import json
- import asyncio
- import traceback
- from datetime import datetime
- from typing import List, Optional, Dict, Any
- from pydantic import BaseModel, Field, validator
- from fastapi import APIRouter, HTTPException, Query
- from fastapi.responses import StreamingResponse
- from core.base.redis_duplicate_checker import RedisDuplicateChecker
- from foundation.observability.logger.loggering import server_logger as logger
- from foundation.infrastructure.tracing import TraceContext, auto_trace
- from foundation.utils.redis_utils import get_file_info,store_file_info
- from core.base.workflow_manager import WorkflowManager
- from core.base.progress_manager import ProgressManager, sse_callback_manager
- from core.base.sse_manager import unified_sse_manager
- from views.construction_review.file_upload import validate_upload_parameters
- from .schemas.error_schemas import LaunchReviewErrors
- launch_review_router = APIRouter(prefix="/sgsc", tags=["前端接口"])
- duplicatechecker = RedisDuplicateChecker()
- # 初始化工作流管理器
- workflow_manager = WorkflowManager(
- max_concurrent_docs=3,
- max_concurrent_reviews=5
- )
- # 初始化进度管理器
- progress_manager = ProgressManager()
- async def sse_progress_callback(callback_task_id: str, current_data: dict):
- """SSE推送回调函数 - 接收进度更新并推送到客户端"""
- await unified_sse_manager.send_progress(callback_task_id, current_data)
- class SimpleSSEManager:
- """
- SSE连接管理器 - 兼容性包装器,委托给统一SSE管理器
- 注意: 此类保持向后兼容,建议直接使用 unified_sse_manager
- """
- async def connect(self, callback_task_id: str, callback_func=None):
- """建立SSE连接"""
- return await unified_sse_manager.establish_connection(callback_task_id, callback_func)
- async def disconnect(self, callback_task_id: str):
- """断开SSE连接"""
- await unified_sse_manager.close_connection(callback_task_id)
- async def send_progress(self, callback_task_id: str, current_data: dict):
- """发送进度消息"""
- await unified_sse_manager.send_progress(callback_task_id, current_data)
- # 创建兼容性实例
- sse_manager = SimpleSSEManager()
- def format_sse_event(event_type: str, data: str) -> str:
- """格式化SSE事件 - 按照SSE协议格式化事件数据"""
- lines = [
- f"event: {event_type}",
- f"data: {data}",
- "",
- ""
- ]
- return "\n".join(lines) + "\n"
- class LaunchReviewRequest(BaseModel):
- """启动审查请求模型"""
- callback_task_id: str = Field(..., description="回调任务ID,从文件上传接口获取")
- user_id: str = Field(..., description="用户标识")
- tendency_review_role: str = Field(
- ...,
- description="倾向性审查角色,暂定为 default_role"
- )
- review_config: Optional[List[str]] = Field(
- None,
- description="审查配置列表,包含的项为启用状态(审查维度枚举值)。与review_item_config互斥"
- )
- review_item_config: Optional[List[str]] = Field(
- None,
- description="审查项配置列表,格式为「章节code_审查维度」(如 basis_sensitive_word_check)。与review_config互斥"
- )
- project_plan_type: str = Field(
- ...,
- description="工程方案类型: 01_pf_Found_Rotary_Drill(旋挖钻机、冲击钻机成孔桩), 02_pf_Dig_Manual_Pile(人工挖孔桩), 03_bd_Sub_Cyl_Pier(圆柱墩、系梁、盖梁), 04_bd_Sub_Rect_Turn(矩形墩采用翻模工艺、系梁、盖梁), 05_bd_High_Rect_Slide(矩形墩采用爬模工艺、系梁、盖梁), 06_bu_Pre_SS_Beam(简支梁预制、运输及架桥机安装), 07_bu_Erect_Truck_TBeam(汽车式起重机安装T梁), 08_bu_Cast_Col_Support(梁柱式支架), 09_bu_Cast_Full_Support(满堂式支架), 10_bu_Cast_Cant_Trolley(挂篮), 11_se_Elev_Lift_Proj(起重吊装工程), 12_se_Tower_Crane_Proj(起重吊装设备安装), 13_o_Height_Work_Op(高空作业)"
- )
- test_designation_chunk_flag: Optional[str] = Field( # 标注为可选字符串
- None,
- description="测试定位标志符,用于指定特定审查片段(可选字段)"
- )
- class Config:
- extra = "forbid" # 禁止额外的字段
- class LaunchReviewResponse(BaseModel):
- """启动审查响应模型"""
- code: int
- data: dict
- def validate_review_config(review_config: List[str]) -> None:
- """验证审查配置参数"""
- # 检查review_config是否为空
- logger.info(f"审查配置: {review_config}")
- if not review_config or len(review_config) == 0:
- raise LaunchReviewErrors.enum_type_cannot_be_null()
- # 支持的审查项枚举值
- supported_review_items = {
- 'sensitive_word_check', # 词句语法检查
- 'semantic_logic_check', # 语义逻辑审查
- 'completeness_check', # 条文完整性审查
- 'timeliness_check', # 时效性审查
- 'reference_check', # 规范性审查
- 'sensitive_check', # 敏感词审查
- 'non_parameter_compliance_check', # 非参数合规性检查功能
- 'parameter_compliance_check', # 参数合规性检查功能
- }
- # 检查是否包含不支持的审查项
- unsupported_items = set(review_config) - supported_review_items
- if unsupported_items:
- raise LaunchReviewErrors.enum_type_invalid()
- def validate_review_config_mutually_exclusive(
- review_config: Optional[List[str]],
- review_item_config: Optional[List[str]]
- ) -> None:
- """
- 验证 review_config 与 review_item_config 互斥性
- 规则:
- 1. 两者不能同时提供(互斥)
- 2. 两者不能同时为空(必须提供其中一个)
- Args:
- review_config: 审查配置列表(审查维度枚举值)
- review_item_config: 审查项配置列表(章节_审查维度格式)
- Raises:
- HTTPException: 当两者同时提供或同时为空时抛出异常
- """
- # 判断字段是否存在(只判断是否为 None,不判断长度)
- has_review_config = review_config is not None
- has_review_item_config = review_item_config is not None
- # 情况1:两者都提供了 - 互斥错误(包括空列表的情况)
- if has_review_config and has_review_item_config:
- raise LaunchReviewErrors.mutually_exclusive_config()
- # 情况2:两者都为空 - 必须提供一个
- if not has_review_config and not has_review_item_config:
- raise LaunchReviewErrors.review_config_required()
- logger.info(f"审查配置验证通过: 使用{'review_config' if has_review_config else 'review_item_config'}")
- def validate_review_item_config(review_item_config: List[str]) -> None:
- """
- 验证审查项配置参数合法性
- 核心规则:每个传入值必须是「chapter_code_review_dimension」格式(如 basis_sensitive_word_check)
- :param review_item_config: 配置列表(元素为待验证的组合字符串)
- :raises ValueError: 配置为空、格式错误或包含不支持的code时抛出异常
- """
- # 1. 非空校验
- if not review_item_config:
- raise LaunchReviewErrors.enum_type_cannot_be_null()
- # 2. 去重检查
- if len(review_item_config) != len(set(review_item_config)):
- # 找出重复的项
- seen = set()
- duplicates = []
- for item in review_item_config:
- if item in seen:
- duplicates.append(item)
- seen.add(item)
- raise LaunchReviewErrors.duplicate_review_items(list(set(duplicates)))
- # 定义支持的基础枚举值
- supported = {
- "chapter_code": {
- "catalogue", "basis", "overview", "plan", "technology",
- "safety", "quality", "environment", "management",
- "acceptance", "other"
- },
- "review_dimensions": {
- "sensitive_word_check", "semantic_logic_check", "completeness_check",
- "timeliness_check", "reference_check", "sensitive_check",
- "non_parameter_compliance_check", "parameter_compliance_check"
- }
- }
- # 按规则校验每个配置项
- invalid_format = [] # 格式错误(不是下划线连接的两个部分)
- invalid_chapter = [] # 章节code不支持
- invalid_review = [] # 审查项code不支持
- catalogue_invalid = [] # 目录章节使用了非完整性审查
- for item in review_item_config:
- # 3. 校验格式:必须至少包含一个下划线(按第一个下划线分割)
- parts = item.split("_", 1) # maxsplit=1,只按第一个下划线分割
- if len(parts) != 2:
- invalid_format.append(item)
- continue
- chapter_code, review_dim = parts
- # 4. 校验章节code是否支持
- if chapter_code not in supported["chapter_code"]:
- invalid_chapter.append(chapter_code)
- continue # 章节不支持时不继续检查审查项
- # 5. 特殊规则:目录章节只能使用完整性审查
- if chapter_code == "catalogue" and review_dim != "completeness_check":
- catalogue_invalid.append(item)
- continue # 目录章节违反规则时不继续检查
- # 6. 校验审查项code是否支持
- if review_dim not in supported["review_dimensions"]:
- invalid_review.append(review_dim)
- # 批量抛出错误(按优先级:格式错误 → 重复 → 章节错误 → 目录特殊规则 → 审查项错误)
- if invalid_format:
- raise LaunchReviewErrors.invalid_review_item_format(invalid_format)
- if invalid_chapter:
- raise LaunchReviewErrors.invalid_chapter_code(list(set(invalid_chapter))) # 去重
- if catalogue_invalid:
- raise LaunchReviewErrors.catalogue_completeness_only(catalogue_invalid)
- if invalid_review:
- raise LaunchReviewErrors.invalid_review_dimension(list(set(invalid_review))) # 去重
- def validate_project_plan_type(project_plan_type: str) -> None:
- """验证工程方案类型"""
- # 当前支持的工程方案类型
- supported_types = {
- '01_pf_Found_Rotary_Drill', # 旋挖钻机、冲击钻机成孔桩
- '02_pf_Dig_Manual_Pile', # 人工挖孔桩
- '03_bd_Sub_Cyl_Pier', # 圆柱墩、系梁、盖梁
- '04_bd_Sub_Rect_Turn', # 矩形墩采用翻模工艺、系梁、盖梁
- '05_bd_High_Rect_Slide', # 矩形墩采用爬模工艺、系梁、盖梁
- '06_bu_Pre_SS_Beam', # 简支梁预制、运输及架桥机安装
- '07_bu_Erect_Truck_TBeam', # 汽车式起重机安装T梁
- '08_bu_Cast_Col_Support', # 梁柱式支架
- '09_bu_Cast_Full_Support', # 满堂式支架
- '10_bu_Cast_Cant_Trolley', # 挂篮
- '11_se_Elev_Lift_Proj', # 起重吊装工程
- '12_se_Tower_Crane_Proj', # 起重吊装设备安装
- '13_o_Height_Work_Op' # 高空作业
- }
- if project_plan_type not in supported_types:
- raise LaunchReviewErrors.project_plan_type_invalid()
- def validate_tendency_review_role(tendency_review_role: str) -> None:
- """验证倾向性审查角色"""
- # 当前支持的倾向性审查角色类型
- supported_roles = {
- 'default_role',
- 'COMMON',#通用
- 'SAFETY_ENVIRONMENT', # 安全环保
- 'QUALITY_COST', # 质量成本
- 'SCHEDULE_EFFICIENCY', # 进度效率
- 'TECH_COMPLIANCE' # 技术合规
- }
- if tendency_review_role not in supported_roles:
- raise LaunchReviewErrors.tendency_review_role_invalid()
- def validate_user_id(user_id: str) -> None:
- """验证用户标识"""
- # 当前支持的用户标识列表
- supported_users = {
- 'user-001'
- }
- if user_id not in supported_users:
- raise LaunchReviewErrors.invalid_user()
- @launch_review_router.post("/sse/launch_review")
- @auto_trace(generate_if_missing=True)
- async def launch_review_sse(request_data: LaunchReviewRequest):
- """
- 启动施工方案审查并返回SSE进度流
- Args:
- request_data: 启动审查请求参数
- Returns:
- StreamingResponse: SSE事件流,包含任务启动状态和进度
- """
- callback_task_id = request_data.callback_task_id
- TraceContext.set_trace_id(callback_task_id)
- user_id = request_data.user_id
- review_config = request_data.review_config
- review_item_config = request_data.review_item_config
- project_plan_type = request_data.project_plan_type
- tendency_review_role = request_data.tendency_review_role
- test_designation_chunk_flag = request_data.test_designation_chunk_flag
- logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}, user_id={user_id}, tendency_review_role={tendency_review_role}")
- # 验证用户标识
- validate_user_id(user_id)
- # 验证审查配置互斥性(优先验证互斥关系)
- validate_review_config_mutually_exclusive(review_config, review_item_config)
- # 根据提供的配置类型进行相应验证(只判断是否为 None)
- if review_config is not None:
- # 使用 review_config 时的验证
- if len(review_config) == 0:
- raise LaunchReviewErrors.review_config_required() # 提供了空列表,提示必须提供有效值
- validate_review_config(review_config)
- elif review_item_config is not None:
- # 使用 review_item_config 时的验证
- if len(review_item_config) == 0:
- raise LaunchReviewErrors.review_config_required() # 提供了空列表,提示必须提供有效值
- validate_review_item_config(review_item_config)
- # 验证工程方案类型
- validate_project_plan_type(project_plan_type)
- # 验证倾向性审查角色
- validate_tendency_review_role(tendency_review_role)
- # 使用统一SSE管理器建立连接并注册回调
- queue = await unified_sse_manager.establish_connection(callback_task_id, sse_progress_callback)
- async def generate_launch_review_events():
- """生成启动审查SSE事件流"""
- try:
- # 发送连接确认
- connected_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "启动审查SSE连接",
- "status": "connected",
- "message": "启动审查SSE连接已建立,正在处理请求...",
- "overall_task_status": "processing",
- "updated_at": int(time.time()),
- "issues": []
- }, ensure_ascii=False)
- yield format_sse_event("connected", connected_data)
- # 处理启动审查逻辑
- try:
- # 从callback_task_id中提取file_id (格式: file_id-timestamp)
- file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id
- logger.info(f"处理文件: {file_id}")
- # 发送处理状态
- status_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 5,
- "stage_name": f"验证文件信息: {file_id}",
- "status": "processing",
- "message": f"正在验证文件信息: {file_id}",
- "overall_task_status": "processing",
- "updated_at": int(time.time()),
- "issues": []
- }, ensure_ascii=False)
- yield format_sse_event("processing", status_data)
- # 验证任务ID是否存在且未过期
- if not await duplicatechecker.is_valid_task_id(callback_task_id):
- raise LaunchReviewErrors.task_not_found_or_expired()
- # 检查任务是否已经被使用启动审查
- if await duplicatechecker.is_task_already_used(callback_task_id):
- raise LaunchReviewErrors.task_already_exists()
- # 标记任务为已使用
- await duplicatechecker.mark_task_as_used(callback_task_id)
- file_info = await get_file_info(file_id, include_content=True)
- if not file_info:
- logger.error(f"文件信息获取失败: {file_id}")
- raise LaunchReviewErrors.file_info_not_found()
- # 立即更新Redis中的callback_task_id为当前值
- try:
- await store_file_info(file_id, {'callback_task_id': callback_task_id})
- logger.info(f"已更新Redis中的callback_task_id: {callback_task_id}")
- except Exception as e:
- logger.warning(f"更新Redis中的callback_task_id失败: {e}")
- # 添加审查配置到文件信息,并确保使用当前正确的callback_task_id
- file_info.update({
- 'user_id': user_id,
- 'review_config': review_config,
- 'review_item_config': review_item_config,
- 'project_plan_type': project_plan_type,
- 'tendency_review_role': tendency_review_role,
- 'test_designation_chunk_flag': test_designation_chunk_flag,
- 'launched_at': int(time.time()),
- 'callback_task_id': callback_task_id # 确保使用当前正确的callback_task_id
- })
- # 提交处理任务到工作流管理器
- await workflow_manager.submit_task_processing(file_info)
- # 发送成功启动状态
- success_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 10,
- "stage_name": "任务启动成功",
- "status": "submitted",
- "message": "施工方案审查任务启动成功,请耐心等待结果...",
- "overall_task_status": "processing",
- "updated_at": int(time.time()),
- "issues": []
- }, ensure_ascii=False)
- yield format_sse_event("submitted", success_data)
- # 继续监听工作流进度
- logger.info(f"开始监听工作流进度: {callback_task_id}")
- while True:
- try:
- message = await queue.get()
- # 处理所有类型的进度更新消息
- message_type = message.get("type")
- current_data = message.get("data")
- if current_data:
- # 根据消息类型决定数据格式
- if message_type == "unit_review_update":
- # 单元审查更新的特殊格式
- unified_data = {
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": current_data.get("current", 0),
- "stage_name": current_data.get("stage_name", "单元审查"),
- "status": "unit_review_update",
- "message": current_data.get("message", ""),
- "overall_task_status": current_data.get("overall_task_status", "processing"),
- "updated_at": current_data.get("updated_at", int(time.time())),
- "issues": current_data.get("issues", [])
- }
- else:
- # 通用进度更新格式(包括 processing_flag, processing, completed 等)
- unified_data = {
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": current_data.get("current", 0),
- "stage_name": current_data.get("stage_name", "处理中"),
- "status": current_data.get("status", "processing"),
- "message": current_data.get("message", ""),
- "overall_task_status": current_data.get("overall_task_status", "processing"),
- "updated_at": current_data.get("updated_at", int(time.time())),
- "issues": current_data.get("issues", [])
- }
- # 使用从progress_manager传递的事件类型,或回退到消息类型
- sse_event_type = current_data.get("event_type", message_type)
- if not sse_event_type:
- sse_event_type = "processing" # 最终回退
- logger.debug(f"生成SSE事件: {sse_event_type}, 消息类型: {message_type}, current: {current_data.get('current')}")
- unified_data_json = json.dumps(unified_data, ensure_ascii=False)
- yield format_sse_event(sse_event_type, unified_data_json)
- # 调试日志:记录接收到的消息
- logger.debug(f"收到消息 - 类型: {message_type}, 数据: {current_data}")
- # 特殊处理:跳过连接建立消息,避免误判为完成
- if message_type == "connection_established":
- logger.info(f"收到连接建立消息,继续监听: {callback_task_id}")
- continue
- # 特殊处理:收到连接关闭信号,立即结束SSE流
- if message_type == "connection_closed":
- completion_data = {
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 100,
- "stage_name": "审查完成",
- "status": "completed",
- "message": f"施工审查方案处理完成!",
- "overall_task_status": "completed",
- "updated_at": current_data.get("updated_at", int(time.time())) if current_data else int(time.time()),
- }
- completion_json = json.dumps(completion_data, ensure_ascii=False)
- yield format_sse_event("completed", completion_json)
- logger.info(f"收到连接关闭信号,结束SSE流: {callback_task_id}")
- logger.info(f"SSE状态: SSE回调已注销")
- break
- except Exception as e:
- logger.error(f"队列消息处理异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- break
- except HTTPException as e:
- logger.error(f"HTTP异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "处理异常",
- "status": "error",
- "message": e.detail.get("message") if hasattr(e, 'detail') and e.detail else str(e),
- "overall_task_status": "failed",
- "updated_at": int(time.time()),
- "issues": [],
- "error": e.detail.get("code") if hasattr(e, 'detail') and e.detail else "http_error"
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- except Exception as e:
- logger.error(f"启动审查处理异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id,
- "current": 0,
- "stage_name": "内部错误",
- "status": "error",
- "message": f"服务端内部错误: {str(e)}",
- "overall_task_status": "failed",
- "updated_at": int(time.time()),
- "issues": [],
- "error": "internal_error"
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- except Exception as e:
- logger.error(f"启动审查SSE事件流异常: {callback_task_id}")
- logger.error(f"异常详情: {str(e)}")
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- error_data = json.dumps({
- "callback_task_id": callback_task_id,
- "user_id": user_id if 'user_id' in locals() else "unknown",
- "current": 0,
- "stage_name": "SSE流异常",
- "status": "error",
- "message": f"SSE流异常: {str(e)}",
- "overall_task_status": "failed",
- "updated_at": int(time.time()),
- "issues": [],
- "error": "sse_error"
- }, ensure_ascii=False)
- yield format_sse_event("error", error_data)
- finally:
- # 清理回调连接(确保资源被正确释放)
- try:
- sse_callback_manager.unregister_callback(callback_task_id)
- except Exception as cleanup_error:
- logger.warning(f"清理回调连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
- try:
- await unified_sse_manager.close_connection(callback_task_id)
- except Exception as cleanup_error:
- logger.warning(f"断开SSE连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
- logger.debug(f"启动审查SSE流已结束: {callback_task_id}")
- return StreamingResponse(
- generate_launch_review_events(),
- media_type="text/event-stream",
- headers={
- "Cache-Control": "no-cache, no-store, must-revalidate",
- "Connection": "keep-alive",
- "Access-Control-Allow-Origin": "*",
- "Access-Control-Allow-Headers": "Cache-Control, EventSource, Content-Type",
- "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
- "X-Accel-Buffering": "no",
- "X-Content-Type-Options": "nosniff"
- }
- )
|