launch_review.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. """
  2. 施工方案审查启动接口
  3. 接收审查配置参数,启动AI审查工作流
  4. """
  5. import uuid
  6. import time
  7. import json
  8. import asyncio
  9. import traceback
  10. from datetime import datetime
  11. from typing import List, Optional, Dict, Any
  12. from pydantic import BaseModel, Field, validator
  13. from fastapi import APIRouter, HTTPException, Query
  14. from fastapi.responses import StreamingResponse
  15. from core.base.redis_duplicate_checker import RedisDuplicateChecker
  16. from foundation.logger.loggering import server_logger as logger
  17. from foundation.trace.trace_context import TraceContext, auto_trace
  18. from foundation.utils.redis_utils import get_file_info,store_file_info
  19. from core.base.workflow_manager import WorkflowManager
  20. from core.base.progress_manager import ProgressManager, sse_callback_manager
  21. from core.base.sse_manager import unified_sse_manager
  22. from views.construction_review.file_upload import validate_upload_parameters
  23. from .schemas.error_schemas import LaunchReviewErrors
  24. launch_review_router = APIRouter(prefix="/sgsc", tags=["审查启动"])
  25. duplicatechecker = RedisDuplicateChecker()
  26. # 初始化工作流管理器
  27. workflow_manager = WorkflowManager(
  28. max_concurrent_docs=3,
  29. max_concurrent_reviews=5
  30. )
  31. # 初始化进度管理器
  32. progress_manager = ProgressManager()
  33. async def sse_progress_callback(callback_task_id: str, current_data: dict):
  34. """SSE推送回调函数 - 接收进度更新并推送到客户端"""
  35. await unified_sse_manager.send_progress(callback_task_id, current_data)
  36. class SimpleSSEManager:
  37. """
  38. SSE连接管理器 - 兼容性包装器,委托给统一SSE管理器
  39. 注意: 此类保持向后兼容,建议直接使用 unified_sse_manager
  40. """
  41. async def connect(self, callback_task_id: str, callback_func=None):
  42. """建立SSE连接"""
  43. return await unified_sse_manager.establish_connection(callback_task_id, callback_func)
  44. async def disconnect(self, callback_task_id: str):
  45. """断开SSE连接"""
  46. await unified_sse_manager.close_connection(callback_task_id)
  47. async def send_progress(self, callback_task_id: str, current_data: dict):
  48. """发送进度消息"""
  49. await unified_sse_manager.send_progress(callback_task_id, current_data)
  50. # 创建兼容性实例
  51. sse_manager = SimpleSSEManager()
  52. def format_sse_event(event_type: str, data: str) -> str:
  53. """格式化SSE事件 - 按照SSE协议格式化事件数据"""
  54. lines = [
  55. f"event: {event_type}",
  56. f"data: {data}",
  57. "",
  58. ""
  59. ]
  60. return "\n".join(lines) + "\n"
  61. class LaunchReviewRequest(BaseModel):
  62. """启动审查请求模型"""
  63. callback_task_id: str = Field(..., description="回调任务ID,从文件上传接口获取")
  64. user_id: str = Field(..., description="用户标识")
  65. tendency_review_role: str = Field(
  66. ...,
  67. description="倾向性审查角色,暂定为 default_role"
  68. )
  69. review_config: List[str] = Field(
  70. ...,
  71. description="审查配置列表,包含的项为启用状态"
  72. )
  73. project_plan_type: str = Field(
  74. "bridge_up_part",
  75. description="工程方案类型,当前仅支持 bridge_up_part"
  76. )
  77. class Config:
  78. extra = "forbid" # 禁止额外的字段
  79. class LaunchReviewResponse(BaseModel):
  80. """启动审查响应模型"""
  81. code: int
  82. data: dict
  83. def validate_review_config(review_config: List[str]) -> None:
  84. """验证审查配置参数"""
  85. # 检查review_config是否为空
  86. if not review_config or len(review_config) == 0:
  87. raise LaunchReviewErrors.enum_type_cannot_be_null()
  88. # 支持的审查项枚举值
  89. supported_review_items = {
  90. 'sensitive_word_check', # 词句语法检查
  91. 'semantic_logic_check', # 语义逻辑审查
  92. 'completeness_check', # 条文完整性审查
  93. 'timeliness_check', # 时效性审查
  94. 'reference_check', # 规范性审查
  95. 'sensitive_words_check', # 敏感词审查
  96. 'mandatory_standards_check', # 强制性标准检查
  97. 'technical_parameters_check', # 技术参数精确检查
  98. 'design_values_check' # 设计值符合性检查
  99. }
  100. # 检查是否包含不支持的审查项
  101. unsupported_items = set(review_config) - supported_review_items
  102. if unsupported_items:
  103. raise LaunchReviewErrors.enum_type_invalid()
  104. def validate_project_plan_type(project_plan_type: str) -> None:
  105. """验证工程方案类型"""
  106. # 当前支持的工程方案类型
  107. supported_types = {'bridge_up_part'} # 桥梁上部结构
  108. if project_plan_type not in supported_types:
  109. raise LaunchReviewErrors.project_plan_type_invalid()
  110. def validate_tendency_review_role(tendency_review_role: str) -> None:
  111. """验证倾向性审查角色"""
  112. # 当前支持的倾向性审查角色类型
  113. supported_roles = {
  114. 'default_role', # 默认角色
  115. }
  116. if tendency_review_role not in supported_roles:
  117. raise LaunchReviewErrors.tendency_review_role_invalid()
  118. def validate_user_id(user_id: str) -> None:
  119. """验证用户标识"""
  120. # 当前支持的用户标识列表
  121. supported_users = {
  122. 'user-001'
  123. }
  124. if user_id not in supported_users:
  125. raise LaunchReviewErrors.invalid_user()
  126. @launch_review_router.post("/sse/launch_review")
  127. @auto_trace(generate_if_missing=True)
  128. async def launch_review_sse(request_data: LaunchReviewRequest):
  129. """
  130. 启动施工方案审查并返回SSE进度流
  131. Args:
  132. request_data: 启动审查请求参数
  133. Returns:
  134. StreamingResponse: SSE事件流,包含任务启动状态和进度
  135. """
  136. callback_task_id = request_data.callback_task_id
  137. TraceContext.set_trace_id(callback_task_id)
  138. user_id = request_data.user_id
  139. review_config = request_data.review_config
  140. project_plan_type = request_data.project_plan_type
  141. tendency_review_role = request_data.tendency_review_role
  142. logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}, user_id={user_id}, tendency_review_role={tendency_review_role}")
  143. # 验证用户标识
  144. validate_user_id(user_id)
  145. # 验证审查配置
  146. validate_review_config(review_config)
  147. # 验证工程方案类型
  148. validate_project_plan_type(project_plan_type)
  149. # 验证倾向性审查角色
  150. validate_tendency_review_role(tendency_review_role)
  151. # 使用统一SSE管理器建立连接并注册回调
  152. queue = await unified_sse_manager.establish_connection(callback_task_id, sse_progress_callback)
  153. async def generate_launch_review_events():
  154. """生成启动审查SSE事件流"""
  155. try:
  156. # 发送连接确认
  157. connected_data = json.dumps({
  158. "callback_task_id": callback_task_id,
  159. "user_id": user_id,
  160. "current": 0,
  161. "stage_name": "启动审查SSE连接",
  162. "status": "connected",
  163. "message": "启动审查SSE连接已建立,正在处理请求...",
  164. "overall_task_status": "processing",
  165. "updated_at": int(time.time()),
  166. "issues": []
  167. }, ensure_ascii=False)
  168. yield format_sse_event("connected", connected_data)
  169. # 处理启动审查逻辑
  170. try:
  171. # 从callback_task_id中提取file_id (格式: file_id-timestamp)
  172. file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id
  173. logger.info(f"处理文件: {file_id}")
  174. # 发送处理状态
  175. status_data = json.dumps({
  176. "callback_task_id": callback_task_id,
  177. "user_id": user_id,
  178. "current": 5,
  179. "stage_name": f"验证文件信息: {file_id}",
  180. "status": "processing",
  181. "message": f"正在验证文件信息: {file_id}",
  182. "overall_task_status": "processing",
  183. "updated_at": int(time.time()),
  184. "issues": []
  185. }, ensure_ascii=False)
  186. yield format_sse_event("processing", status_data)
  187. # 验证任务ID是否存在且未过期
  188. if not await duplicatechecker.is_valid_task_id(callback_task_id):
  189. raise LaunchReviewErrors.task_not_found_or_expired()
  190. # 检查任务是否已经被使用启动审查
  191. if await duplicatechecker.is_task_already_used(callback_task_id):
  192. raise LaunchReviewErrors.task_already_exists()
  193. # 标记任务为已使用
  194. await duplicatechecker.mark_task_as_used(callback_task_id)
  195. file_info = await get_file_info(file_id, include_content=True)
  196. if not file_info:
  197. logger.error(f"文件信息获取失败: {file_id}")
  198. raise LaunchReviewErrors.file_info_not_found()
  199. # 立即更新Redis中的callback_task_id为当前值
  200. try:
  201. await store_file_info(file_id, {'callback_task_id': callback_task_id})
  202. logger.info(f"已更新Redis中的callback_task_id: {callback_task_id}")
  203. except Exception as e:
  204. logger.warning(f"更新Redis中的callback_task_id失败: {e}")
  205. # 添加审查配置到文件信息,并确保使用当前正确的callback_task_id
  206. file_info.update({
  207. 'user_id': user_id,
  208. 'review_config': review_config,
  209. 'project_plan_type': project_plan_type,
  210. 'tendency_review_role': tendency_review_role,
  211. 'launched_at': int(time.time()),
  212. 'callback_task_id': callback_task_id # 确保使用当前正确的callback_task_id
  213. })
  214. # 提交处理任务到工作流管理器
  215. await workflow_manager.submit_task_processing(file_info)
  216. # 发送成功启动状态
  217. success_data = json.dumps({
  218. "callback_task_id": callback_task_id,
  219. "user_id": user_id,
  220. "current": 10,
  221. "stage_name": "任务启动成功",
  222. "status": "submitted",
  223. "message": "施工方案审查任务启动成功,请耐心等待结果...",
  224. "overall_task_status": "processing",
  225. "updated_at": int(time.time()),
  226. "issues": []
  227. }, ensure_ascii=False)
  228. yield format_sse_event("submitted", success_data)
  229. # 继续监听工作流进度
  230. logger.info(f"开始监听工作流进度: {callback_task_id}")
  231. while True:
  232. try:
  233. message = await queue.get()
  234. # 处理所有类型的进度更新消息
  235. message_type = message.get("type")
  236. current_data = message.get("data")
  237. if current_data:
  238. # 根据消息类型决定数据格式
  239. if message_type == "unit_review_update":
  240. # 单元审查更新的特殊格式
  241. unified_data = {
  242. "callback_task_id": callback_task_id,
  243. "user_id": user_id,
  244. "current": current_data.get("current", 0),
  245. "stage_name": current_data.get("stage_name", "单元审查"),
  246. "status": "unit_review_update",
  247. "message": current_data.get("message", ""),
  248. "overall_task_status": current_data.get("overall_task_status", "processing"),
  249. "updated_at": current_data.get("updated_at", int(time.time())),
  250. "issues": current_data.get("issues", [])
  251. }
  252. else:
  253. # 通用进度更新格式(包括 processing_flag, processing, completed 等)
  254. unified_data = {
  255. "callback_task_id": callback_task_id,
  256. "user_id": user_id,
  257. "current": current_data.get("current", 0),
  258. "stage_name": current_data.get("stage_name", "处理中"),
  259. "status": current_data.get("status", "processing"),
  260. "message": current_data.get("message", ""),
  261. "overall_task_status": current_data.get("overall_task_status", "processing"),
  262. "updated_at": current_data.get("updated_at", int(time.time())),
  263. "issues": current_data.get("issues", [])
  264. }
  265. # 使用从progress_manager传递的事件类型,或回退到消息类型
  266. sse_event_type = current_data.get("event_type", message_type)
  267. if not sse_event_type:
  268. sse_event_type = "processing" # 最终回退
  269. logger.debug(f"生成SSE事件: {sse_event_type}, 消息类型: {message_type}, current: {current_data.get('current')}")
  270. unified_data_json = json.dumps(unified_data, ensure_ascii=False)
  271. yield format_sse_event(sse_event_type, unified_data_json)
  272. # 调试日志:记录接收到的消息
  273. logger.debug(f"收到消息 - 类型: {message_type}, 数据: {current_data}")
  274. # 特殊处理:跳过连接建立消息,避免误判为完成
  275. if message_type == "connection_established":
  276. logger.info(f"收到连接建立消息,继续监听: {callback_task_id}")
  277. continue
  278. # 特殊处理:收到连接关闭信号,立即结束SSE流
  279. if message_type == "connection_closed":
  280. completion_data = {
  281. "callback_task_id": callback_task_id,
  282. "user_id": user_id,
  283. "current": 100,
  284. "stage_name": "审查完成",
  285. "status": "completed",
  286. "message": f"施工审查方案处理完成!",
  287. "overall_task_status": "completed",
  288. "updated_at": current_data.get("updated_at", int(time.time())) if current_data else int(time.time()),
  289. }
  290. completion_json = json.dumps(completion_data, ensure_ascii=False)
  291. yield format_sse_event("completed", completion_json)
  292. logger.info(f"收到连接关闭信号,结束SSE流: {callback_task_id}")
  293. logger.info(f"SSE状态: SSE回调已注销")
  294. break
  295. except Exception as e:
  296. logger.error(f"队列消息处理异常: {callback_task_id}")
  297. logger.error(f"异常详情: {str(e)}")
  298. logger.error(f"异常堆栈: {traceback.format_exc()}")
  299. break
  300. except HTTPException as e:
  301. logger.error(f"HTTP异常: {callback_task_id}")
  302. logger.error(f"异常详情: {str(e)}")
  303. logger.error(f"异常堆栈: {traceback.format_exc()}")
  304. error_data = json.dumps({
  305. "callback_task_id": callback_task_id,
  306. "user_id": user_id,
  307. "current": 0,
  308. "stage_name": "处理异常",
  309. "status": "error",
  310. "message": e.detail.get("message") if hasattr(e, 'detail') and e.detail else str(e),
  311. "overall_task_status": "failed",
  312. "updated_at": int(time.time()),
  313. "issues": [],
  314. "error": e.detail.get("code") if hasattr(e, 'detail') and e.detail else "http_error"
  315. }, ensure_ascii=False)
  316. yield format_sse_event("error", error_data)
  317. except Exception as e:
  318. logger.error(f"启动审查处理异常: {callback_task_id}")
  319. logger.error(f"异常详情: {str(e)}")
  320. logger.error(f"异常堆栈: {traceback.format_exc()}")
  321. error_data = json.dumps({
  322. "callback_task_id": callback_task_id,
  323. "user_id": user_id,
  324. "current": 0,
  325. "stage_name": "内部错误",
  326. "status": "error",
  327. "message": f"服务端内部错误: {str(e)}",
  328. "overall_task_status": "failed",
  329. "updated_at": int(time.time()),
  330. "issues": [],
  331. "error": "internal_error"
  332. }, ensure_ascii=False)
  333. yield format_sse_event("error", error_data)
  334. except Exception as e:
  335. logger.error(f"启动审查SSE事件流异常: {callback_task_id}")
  336. logger.error(f"异常详情: {str(e)}")
  337. logger.error(f"异常堆栈: {traceback.format_exc()}")
  338. error_data = json.dumps({
  339. "callback_task_id": callback_task_id,
  340. "user_id": user_id if 'user_id' in locals() else "unknown",
  341. "current": 0,
  342. "stage_name": "SSE流异常",
  343. "status": "error",
  344. "message": f"SSE流异常: {str(e)}",
  345. "overall_task_status": "failed",
  346. "updated_at": int(time.time()),
  347. "issues": [],
  348. "error": "sse_error"
  349. }, ensure_ascii=False)
  350. yield format_sse_event("error", error_data)
  351. finally:
  352. # 清理回调连接(确保资源被正确释放)
  353. try:
  354. sse_callback_manager.unregister_callback(callback_task_id)
  355. except Exception as cleanup_error:
  356. logger.warning(f"清理回调连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
  357. try:
  358. await unified_sse_manager.close_connection(callback_task_id)
  359. except Exception as cleanup_error:
  360. logger.warning(f"断开SSE连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
  361. logger.debug(f"启动审查SSE流已结束: {callback_task_id}")
  362. return StreamingResponse(
  363. generate_launch_review_events(),
  364. media_type="text/event-stream",
  365. headers={
  366. "Cache-Control": "no-cache, no-store, must-revalidate",
  367. "Connection": "keep-alive",
  368. "Access-Control-Allow-Origin": "*",
  369. "Access-Control-Allow-Headers": "Cache-Control, EventSource, Content-Type",
  370. "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
  371. "X-Accel-Buffering": "no",
  372. "X-Content-Type-Options": "nosniff"
  373. }
  374. )