launch_review.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. """
  2. 施工方案审查启动接口
  3. 接收审查配置参数,启动AI审查工作流
  4. """
  5. import uuid
  6. import time
  7. import json
  8. import asyncio
  9. import traceback
  10. from datetime import datetime
  11. from typing import List, Optional, Dict, Any
  12. from pydantic import BaseModel, Field
  13. from fastapi import APIRouter, HTTPException, Query
  14. from fastapi.responses import StreamingResponse
  15. from core.base.redis_duplicate_checker import RedisDuplicateChecker
  16. from foundation.logger.loggering import server_logger as logger
  17. from foundation.trace.trace_context import TraceContext, auto_trace
  18. from foundation.utils.redis_utils import get_file_info, delete_file_info
  19. from core.base.workflow_manager import WorkflowManager
  20. from core.base.progress_manager import ProgressManager, sse_callback_manager
  21. from views.construction_review.file_upload import validate_upload_parameters
  22. from .schemas.error_schemas import LaunchReviewErrors
  23. launch_review_router = APIRouter(prefix="/sgsc", tags=["审查启动"])
  24. duplicatechecker = RedisDuplicateChecker()
  25. # 初始化工作流管理器
  26. workflow_manager = WorkflowManager(
  27. max_concurrent_docs=3,
  28. max_concurrent_reviews=5
  29. )
  30. # 初始化进度管理器
  31. progress_manager = ProgressManager()
  32. async def sse_progress_callback(callback_task_id: str, current_data: dict):
  33. """SSE推送回调函数 - 接收进度更新并推送到客户端"""
  34. await sse_manager.send_progress(callback_task_id, current_data)
  35. class SimpleSSEManager:
  36. """SSE连接管理器 - 管理客户端SSE连接和消息推送"""
  37. def __init__(self):
  38. self.connections: Dict[str, asyncio.Queue] = {}
  39. async def connect(self, callback_task_id: str):
  40. """建立SSE连接 - 创建消息队列并发送连接确认"""
  41. queue = asyncio.Queue()
  42. self.connections[callback_task_id] = queue
  43. await queue.put({
  44. "type": "connection_established",
  45. "callback_task_id": callback_task_id,
  46. "timestamp": datetime.now().isoformat()
  47. })
  48. logger.info(f"SSE连接: {callback_task_id}")
  49. return queue
  50. async def disconnect(self, callback_task_id: str):
  51. """断开SSE连接 - 清理连接队列"""
  52. if callback_task_id in self.connections:
  53. del self.connections[callback_task_id]
  54. logger.info(f"SSE连接已断开: {callback_task_id}")
  55. async def send_progress(self, callback_task_id: str, current_data: dict):
  56. """发送进度更新 - 将进度数据放入队列推送给客户端"""
  57. queue = self.connections.get(callback_task_id)
  58. if queue:
  59. await queue.put({
  60. "type": "progress_update",
  61. "data": current_data,
  62. "timestamp": datetime.now().isoformat()
  63. })
  64. logger.debug(f"SSE进度已推送: {callback_task_id}")
  65. sse_manager = SimpleSSEManager()
  66. def format_sse_event(event_type: str, data: str) -> str:
  67. """格式化SSE事件 - 按照SSE协议格式化事件数据"""
  68. lines = [
  69. f"event: {event_type}",
  70. f"data: {data}",
  71. "",
  72. ""
  73. ]
  74. return "\n".join(lines) + "\n"
  75. class LaunchReviewRequest(BaseModel):
  76. """启动审查请求模型"""
  77. callback_task_id: str = Field(..., description="回调任务ID,从文件上传接口获取")
  78. review_config: List[str] = Field(
  79. ...,
  80. description="审查配置列表,包含的项为启用状态"
  81. )
  82. project_plan_type: str = Field(
  83. "bridge_up_part",
  84. description="工程方案类型,当前仅支持 bridge_up_part"
  85. )
  86. class Config:
  87. extra = "forbid" # 禁止额外的字段
  88. class LaunchReviewResponse(BaseModel):
  89. """启动审查响应模型"""
  90. code: int
  91. data: dict
  92. def validate_review_config(review_config: List[str]) -> None:
  93. """验证审查配置参数"""
  94. # 检查review_config是否为空
  95. if not review_config or len(review_config) == 0:
  96. raise LaunchReviewErrors.enum_type_cannot_be_null()
  97. # 支持的审查项枚举值
  98. supported_review_items = {
  99. 'sensitive_word_check', # 词句语法检查
  100. 'semantic_logic_check', # 语义逻辑审查
  101. 'completeness_check', # 条文完整性审查
  102. 'timeliness_check', # 时效性审查
  103. 'reference_check', # 规范性审查
  104. 'sensitive_words_check', # 敏感词审查
  105. 'mandatory_standards_check', # 强制性标准检查
  106. 'technical_parameters_check', # 技术参数精确检查
  107. 'design_values_check' # 设计值符合性检查
  108. }
  109. # 检查是否包含不支持的审查项
  110. unsupported_items = set(review_config) - supported_review_items
  111. if unsupported_items:
  112. raise LaunchReviewErrors.enum_type_invalid()
  113. def validate_project_plan_type(project_plan_type: str) -> None:
  114. """验证工程方案类型"""
  115. # 当前支持的工程方案类型
  116. supported_types = {'bridge_up_part'} # 桥梁上部结构
  117. if project_plan_type not in supported_types:
  118. raise LaunchReviewErrors.project_plan_type_invalid()
  119. @launch_review_router.post("/sse/launch_review")
  120. @auto_trace(generate_if_missing=True)
  121. async def launch_review_sse(request_data: LaunchReviewRequest):
  122. """
  123. 启动施工方案审查并返回SSE进度流
  124. Args:
  125. request_data: 启动审查请求参数
  126. Returns:
  127. StreamingResponse: SSE事件流,包含任务启动状态和进度
  128. """
  129. callback_task_id = request_data.callback_task_id
  130. TraceContext.set_trace_id(callback_task_id)
  131. review_config = request_data.review_config
  132. project_plan_type = request_data.project_plan_type
  133. logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}")
  134. # 验证审查配置
  135. validate_review_config(review_config)
  136. # 验证工程方案类型
  137. validate_project_plan_type(project_plan_type)
  138. # 注册SSE回调
  139. sse_callback_manager.register_callback(callback_task_id, sse_progress_callback)
  140. queue = await sse_manager.connect(callback_task_id)
  141. async def generate_launch_review_events():
  142. """生成启动审查SSE事件流"""
  143. try:
  144. # 发送连接确认
  145. connected_data = json.dumps({
  146. "callback_task_id": callback_task_id,
  147. "message": "启动审查SSE连接已建立,正在处理请求...",
  148. "timestamp": datetime.now().isoformat()
  149. }, ensure_ascii=False)
  150. yield format_sse_event("connected", connected_data)
  151. # 处理启动审查逻辑
  152. try:
  153. from foundation.utils.redis_utils import get_file_info
  154. # 从callback_task_id中提取file_id (格式: file_id-timestamp)
  155. file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id
  156. # 发送处理状态
  157. status_data = json.dumps({
  158. "callback_task_id": callback_task_id,
  159. "stage": "validation",
  160. "message": f"正在验证文件信息: {file_id}",
  161. "timestamp": datetime.now().isoformat()
  162. }, ensure_ascii=False)
  163. yield format_sse_event("processing", status_data)
  164. # 检查重复任务
  165. if await duplicatechecker.is_duplicate_task(file_id):
  166. error_data = json.dumps({
  167. "callback_task_id": callback_task_id,
  168. "error": "task_already_exists",
  169. "message": "任务已存在,请勿重复提交",
  170. "timestamp": datetime.now().isoformat()
  171. }, ensure_ascii=False)
  172. yield format_sse_event("error", error_data)
  173. return
  174. # 获取文件信息
  175. status_data = json.dumps({
  176. "callback_task_id": callback_task_id,
  177. "stage": "loading",
  178. "message": "正在加载文件信息...",
  179. "timestamp": datetime.now().isoformat()
  180. }, ensure_ascii=False)
  181. yield format_sse_event("processing", status_data)
  182. file_info = await get_file_info(file_id, include_content=True)
  183. if not file_info:
  184. error_data = json.dumps({
  185. "callback_task_id": callback_task_id,
  186. "error": "task_not_found",
  187. "message": "任务ID不存在或已过期",
  188. "timestamp": datetime.now().isoformat()
  189. }, ensure_ascii=False)
  190. yield format_sse_event("error", error_data)
  191. return
  192. # 验证必要的字段
  193. if 'file_content' not in file_info:
  194. error_data = json.dumps({
  195. "callback_task_id": callback_task_id,
  196. "error": "missing_content",
  197. "message": "文件内容缺失",
  198. "timestamp": datetime.now().isoformat()
  199. }, ensure_ascii=False)
  200. yield format_sse_event("error", error_data)
  201. return
  202. # 添加审查配置到文件信息
  203. file_info.update({
  204. 'review_config': review_config,
  205. 'project_plan_type': project_plan_type,
  206. 'launched_at': int(time.time())
  207. })
  208. # 发送提交任务状态
  209. status_data = json.dumps({
  210. "callback_task_id": callback_task_id,
  211. "stage": "submitting",
  212. "message": "正在提交AI审查任务...",
  213. "timestamp": datetime.now().isoformat()
  214. }, ensure_ascii=False)
  215. yield format_sse_event("processing", status_data)
  216. # 提交处理任务到工作流管理器
  217. task_id = await workflow_manager.submit_task_processing(file_info)
  218. # 发送成功启动状态
  219. success_data = json.dumps({
  220. "callback_task_id": callback_task_id,
  221. "task_id": task_id,
  222. "file_id": file_info['file_id'],
  223. "review_config": review_config,
  224. "project_plan_type": project_plan_type,
  225. "status": "submitted",
  226. "submitted_at": file_info['launched_at'],
  227. "message": "AI审查任务已成功启动",
  228. "timestamp": datetime.now().isoformat()
  229. }, ensure_ascii=False)
  230. yield format_sse_event("submitted", success_data)
  231. # 继续监听工作流进度
  232. logger.info(f"开始监听工作流进度: {callback_task_id}")
  233. while True:
  234. try:
  235. message = await queue.get()
  236. if message.get("type") == "progress_update":
  237. current_data = message.get("data")
  238. if current_data:
  239. progress_json = json.dumps(current_data, ensure_ascii=False)
  240. yield format_sse_event("progress", progress_json)
  241. overall_task_status = current_data.get("overall_task_status")
  242. if overall_task_status in ["completed", "failed"]:
  243. completion_data = {
  244. "callback_task_id": callback_task_id,
  245. "task_status": overall_task_status,
  246. "overall_progress": current_data.get("current", 100),
  247. "timestamp": datetime.now().isoformat(),
  248. "message": "审查任务处理完成!"
  249. }
  250. completion_json = json.dumps(completion_data, ensure_ascii=False)
  251. yield format_sse_event("completed", completion_json)
  252. break
  253. except Exception as e:
  254. logger.error(f"队列消息处理异常: {callback_task_id}")
  255. logger.error(f"异常详情: {str(e)}")
  256. logger.error(f"异常堆栈: {traceback.format_exc()}")
  257. break
  258. except HTTPException as e:
  259. logger.error(f"HTTP异常: {callback_task_id}")
  260. logger.error(f"异常详情: {str(e)}")
  261. logger.error(f"异常堆栈: {traceback.format_exc()}")
  262. error_data = json.dumps({
  263. "callback_task_id": callback_task_id,
  264. "error": e.detail.get("code") if hasattr(e, 'detail') and e.detail else "http_error",
  265. "message": e.detail.get("message") if hasattr(e, 'detail') and e.detail else str(e),
  266. "timestamp": datetime.now().isoformat()
  267. }, ensure_ascii=False)
  268. yield format_sse_event("error", error_data)
  269. except Exception as e:
  270. logger.error(f"启动审查处理异常: {callback_task_id}")
  271. logger.error(f"异常详情: {str(e)}")
  272. logger.error(f"异常堆栈: {traceback.format_exc()}")
  273. error_data = json.dumps({
  274. "callback_task_id": callback_task_id,
  275. "error": "internal_error",
  276. "message": f"服务端内部错误: {str(e)}",
  277. "timestamp": datetime.now().isoformat()
  278. }, ensure_ascii=False)
  279. yield format_sse_event("error", error_data)
  280. except Exception as e:
  281. logger.error(f"启动审查SSE事件流异常: {callback_task_id}")
  282. logger.error(f"异常详情: {str(e)}")
  283. logger.error(f"异常堆栈: {traceback.format_exc()}")
  284. error_data = json.dumps({
  285. "callback_task_id": callback_task_id,
  286. "error": "sse_error",
  287. "message": f"SSE流异常: {str(e)}",
  288. "timestamp": datetime.now().isoformat()
  289. }, ensure_ascii=False)
  290. yield format_sse_event("error", error_data)
  291. finally:
  292. # 清理回调连接
  293. sse_callback_manager.unregister_callback(callback_task_id)
  294. await sse_manager.disconnect(callback_task_id)
  295. logger.debug(f"启动审查SSE流已结束: {callback_task_id}")
  296. return StreamingResponse(
  297. generate_launch_review_events(),
  298. media_type="text/event-stream",
  299. headers={
  300. "Cache-Control": "no-cache, no-store, must-revalidate",
  301. "Connection": "keep-alive",
  302. "Access-Control-Allow-Origin": "*",
  303. "Access-Control-Allow-Headers": "Cache-Control, EventSource, Content-Type",
  304. "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
  305. "X-Accel-Buffering": "no",
  306. "X-Content-Type-Options": "nosniff"
  307. }
  308. )
  309. @launch_review_router.get("/sse/launch_review_status")
  310. async def get_launch_review_sse_status():
  311. """获取启动审查SSE连接状态 - 返回当前活跃的启动审查SSE连接信息"""
  312. return {
  313. "active_connections": len(sse_manager.connections),
  314. "connections": list(sse_manager.connections.keys()),
  315. "timestamp": datetime.now().isoformat(),
  316. "service": "launch_review_sse"
  317. }