Browse Source

fix: 非专业性审查链路无执行日志 — POST SSE 响应被前端提前断开

根因: 前端 connectFromResponse 读取 POST 响应流的 started 事件获取
task_id 后调用 reader.cancel(),触发后端 StreamingResponse finally 块的
exec_task.cancel(),后台审查任务在开始前即被取消。

修复: POST /debug/api/review/execute 改为返回 JSON {task_id},
后台任务在独立协程中运行,通过 event_queue 推送进度。
前端拿到 task_id 后直接通过 EventSource 连接 GET SSE 流端点,
无需先读 POST 流再断开。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WangXuMing 1 tuần trước cách đây
mục cha
commit
2372f9cb85

+ 2 - 2
frontend/src/pages/DebugWorkbench.tsx

@@ -415,8 +415,8 @@ export default function DebugWorkbench() {
       };
 
       try {
-        const response = await executeReview(requestParams);
-        const taskId = await sse.connectFromResponse(response, {
+        const { task_id: taskId } = await executeReview(requestParams);
+        sse.connect(taskId, {
           onStarted: handleSSEStarted,
           onStepProgress: handleSSEStepProgress,
           onStepResult: handleSSEStepResult,

+ 2 - 2
frontend/src/services/debugApi.ts

@@ -15,7 +15,7 @@ export interface PromptVersionInfo {
 export async function executeReview(
   params: DebugExecuteRequest,
   signal?: AbortSignal,
-): Promise<Response> {
+): Promise<{ task_id: string }> {
   const res = await fetch(`${BASE_URL}/review/execute`, {
     method: 'POST',
     headers: { 'Content-Type': 'application/json' },
@@ -32,7 +32,7 @@ export async function executeReview(
     }
     throw new Error(message);
   }
-  return res;
+  return res.json();
 }
 
 export async function fetchPromptVersions(): Promise<PromptVersionInfo[]> {

+ 123 - 129
views/debug/debug_api.py

@@ -387,22 +387,13 @@ register_rag_debug_routes(debug_router)
 @debug_router.post("/api/review/execute")
 async def execute_review(request: DebugExecuteRequest):
     """
-    执行审查调试(SSE 流式返回)
+    启动审查调试任务,返回 task_id。
 
-    支持全部 7 个审查链路:
-    - completeness: 完整性审查
-    - timeliness: 时效性审查
-    - reference: 规范性审查
-    - sensitive: 敏感词检查
-    - semantic: 语义逻辑检查
-    - grammar: 语法检查
-    - professional: 专业性审查
-
-    支持环节隔离模式,可单独执行指定步骤。
-    调试执行不进 Celery 队列,直接在请求协程中执行,支持实时 SSE 推送。
+    前端拿到 task_id 后通过 GET /debug/api/review/stream/{task_id} (EventSource)
+    接收 SSE 实时进度推送。
 
+    支持全部 7 个审查链路。
     并发限制:最多 5 个调试任务同时执行。
-    超时控制:全局默认 180s。
     """
     # ---- 检查并发上限 ----
     if debug_semaphore.locked():
@@ -416,116 +407,116 @@ async def execute_review(request: DebugExecuteRequest):
 
     chain_id = request.chain_id
     total_steps = CHAIN_STEPS_COUNT.get(chain_id, 3)
+    event_queue: asyncio.Queue = asyncio.Queue()
+    task_id = make_trace_id(chain_id)
+    record_id = make_record_id()
 
-    async def event_generator():
-        async with debug_semaphore:
-            event_queue: asyncio.Queue = asyncio.Queue()
-            task_id = make_trace_id(chain_id)
-            record_id = make_record_id()
+    _running_tasks[task_id] = event_queue
+
+    # 后台启动执行任务(不 await,让它在后台运行)
+    asyncio.create_task(_background_execute(
+        request, event_queue, task_id, record_id, chain_id,
+    ))
+
+    return {
+        "task_id": task_id,
+        "chain_id": chain_id,
+        "total_steps": total_steps,
+    }
 
-            # 缓存任务队列供断线重连
-            _running_tasks[task_id] = event_queue
 
-            # ---- 发送 started 事件 ----
-            yield format_sse_event("started", {
+async def _background_execute(
+    request: DebugExecuteRequest,
+    event_queue: asyncio.Queue,
+    task_id: str,
+    record_id: str,
+    chain_id: str,
+) -> None:
+    """后台执行审查调试,通过 event_queue 推送进度"""
+    async with debug_semaphore:
+        completed_data = None
+        error_occurred = None
+        steps_collected: List[dict] = []
+
+        try:
+            # 发送 started 事件
+            await event_queue.put(("started", {
                 "task_id": task_id,
                 "chain_id": chain_id,
-                "total_steps": total_steps,
-            })
+                "total_steps": CHAIN_STEPS_COUNT.get(chain_id, 3),
+            }))
 
-            # ---- 后台执行任务 ----
+            # 启动执行器
             exec_task = asyncio.create_task(
                 _run_debug_execution(request, event_queue, task_id, record_id)
             )
 
-            completed_data = None
-            error_occurred = None
-            steps_collected: List[dict] = []
-
-            try:
-                # ---- 消费队列事件 ----
-                while True:
-                    try:
-                        event_type, data = await asyncio.wait_for(
-                            event_queue.get(),
-                            timeout=DEBUG_GLOBAL_TIMEOUT,
-                        )
-                    except asyncio.TimeoutError:
-                        error_occurred = "执行超时"
-                        yield format_sse_event("error", {
-                            "task_id": task_id,
-                            "message": f"执行超时 (>{DEBUG_GLOBAL_TIMEOUT}s)",
-                        })
-                        exec_task.cancel()
-                        break
-
-                    if event_type == "__end__":
-                        break
-
-                    # 跳过 executor 的 started(我们已发送自定义 started)
-                    if event_type == "started":
-                        continue
-
-                    # 收集步骤数据用于保存记录
-                    if event_type == "step_result":
-                        steps_collected.append({
-                            "index": data.get("step_index"),
-                            "name": data.get("step_name"),
-                            "status": data.get("status"),
-                            "duration_ms": int((data.get("duration") or 0) * 1000),
-                            "input": data.get("input", {}),
-                            "output": data.get("output", {}),
-                        })
-
-                    if event_type == "error":
-                        error_occurred = data.get("message", "未知错误")
-
-                    if event_type == "completed":
-                        completed_data = data
-
-                    yield format_sse_event(event_type, data)
-
-                    if event_type == "completed":
-                        break
-
-            except asyncio.CancelledError:
-                yield format_sse_event("error", {
-                    "task_id": task_id,
-                    "message": "连接已断开,任务被取消",
-                })
-            except Exception as exc:
-                logger.exception("[execute_review] 事件处理异常")
-                yield format_sse_event("error", {
-                    "task_id": task_id,
-                    "message": str(exc),
-                })
-            finally:
-                _running_tasks.pop(task_id, None)
-                if not exec_task.done():
-                    exec_task.cancel()
-
-                # ---- 自动保存调用记录 ----
+            # 消费队列事件
+            while True:
                 try:
-                    await _save_debug_record(
-                        request=request,
-                        task_id=task_id,
-                        record_id=record_id,
-                        chain_id=chain_id,
-                        completed_data=completed_data,
-                        error_message=error_occurred,
-                        steps=steps_collected,
+                    event_type, data = await asyncio.wait_for(
+                        event_queue.get(),
+                        timeout=DEBUG_GLOBAL_TIMEOUT,
                     )
-                except Exception as exc:
-                    logger.warning("[execute_review] 保存调用记录失败: %s", exc)
+                except asyncio.TimeoutError:
+                    error_occurred = "执行超时"
+                    await event_queue.put(("error", {
+                        "task_id": task_id,
+                        "message": f"执行超时 (>{DEBUG_GLOBAL_TIMEOUT}s)",
+                    }))
+                    exec_task.cancel()
+                    break
 
-    return StreamingResponse(
-        event_generator(),
-        media_type="text/event-stream",
-        headers={
-            "Cache-Control": "no-cache",
-            "X-Accel-Buffering": "no",
-        },
-    )
+                if event_type == "__end__":
+                    break
+
+                if event_type == "started":
+                    continue
+
+                if event_type == "step_result":
+                    steps_collected.append({
+                        "index": data.get("step_index"),
+                        "name": data.get("step_name"),
+                        "status": data.get("status"),
+                        "duration_ms": int((data.get("duration") or 0) * 1000),
+                        "input": data.get("input", {}),
+                        "output": data.get("output", {}),
+                    })
+
+                if event_type == "error":
+                    error_occurred = data.get("message", "未知错误")
+
+                if event_type == "completed":
+                    completed_data = data
+                    break
+
+        except asyncio.CancelledError:
+            await event_queue.put(("error", {
+                "task_id": task_id,
+                "message": "任务被取消",
+            }))
+        except Exception as exc:
+            logger.exception("[_background_execute] 执行异常")
+            await event_queue.put(("error", {
+                "task_id": task_id,
+                "message": str(exc),
+            }))
+        finally:
+            _running_tasks.pop(task_id, None)
+
+            # 自动保存调用记录
+            try:
+                await _save_debug_record(
+                    request=request,
+                    task_id=task_id,
+                    record_id=record_id,
+                    chain_id=chain_id,
+                    completed_data=completed_data,
+                    error_message=error_occurred,
+                    steps=steps_collected,
+                )
+            except Exception as exc:
+                logger.warning("[execute_review] 保存调用记录失败: %s", exc)
 
 
 # ============ SSE 断线重连端点 ============
@@ -555,25 +546,28 @@ async def stream_review_progress(task_id: str = Path(..., description="任务 ID
         })
 
         # 继续从原队列消费事件
-        while True:
-            try:
-                event_type, data = await asyncio.wait_for(
-                    queue.get(),
-                    timeout=DEBUG_GLOBAL_TIMEOUT,
-                )
-            except asyncio.TimeoutError:
-                yield format_sse_event("error", {
-                    "task_id": task_id,
-                    "message": "重连等待超时",
-                })
-                break
-
-            if event_type == "__end__":
-                break
-            yield format_sse_event(event_type, data)
-            if event_type == "completed":
-                break
-            queue.task_done()
+        try:
+            while True:
+                try:
+                    event_type, data = await asyncio.wait_for(
+                        queue.get(),
+                        timeout=DEBUG_GLOBAL_TIMEOUT,
+                    )
+                except asyncio.TimeoutError:
+                    yield format_sse_event("error", {
+                        "task_id": task_id,
+                        "message": "重连等待超时",
+                    })
+                    break
+
+                if event_type == "__end__":
+                    break
+                yield format_sse_event(event_type, data)
+
+                if event_type in ("completed", "error"):
+                    break
+        finally:
+            _running_tasks.pop(task_id, None)
 
     return StreamingResponse(
         event_generator(),