Prechádzať zdrojové kódy

fix: 审查执行卡住 — _background_execute 与 SSE 端点竞争消费 event_queue

问题1: _background_execute 和 GET SSE 端点同时消费同一个 asyncio.Queue,
后台任务抢走了 step_result/completed 事件,前端 EventSource 收不到任何进度。

修复: _background_execute 不再消费队列,直接 await executor.execute() 获取
返回值(completed_data/steps/errors),队列仅由 GET SSE 端点消费。
executor.execute() 现在返回执行结果 dict 用于记录持久化。

问题2: StepNode 组件混合 border 简写和 borderColor 导致 React CSS 冲突警告。
修复: nodeErrorStyle 和 isOpen border 改用完整的 border 简写。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WangXuMing 1 týždeň pred
rodič
commit
0515c791d2

+ 31 - 12
core/debug/executor.py

@@ -190,13 +190,12 @@ class DebugExecutor:
         self.base_reviewer = BaseReviewer()
 
     async def execute(self, request: DebugExecuteRequest,
-                      event_queue: asyncio.Queue) -> None:
+                      event_queue: asyncio.Queue) -> dict:
         """
         执行审查调试,所有事件通过 event_queue 推送。
 
-        Args:
-            request: 调试执行请求
-            event_queue: asyncio.Queue,消费者消费后格式化为 SSE
+        Returns:
+            dict with keys: completed_data, steps_collected, error_occurred, record_id
         """
         chain_id = request.chain_id
         chain_cfg = CHAIN_CONFIG[chain_id]
@@ -282,21 +281,41 @@ class DebugExecutor:
         # --- completed 事件 ---
         total_duration = sum(s.duration for s in step_results)
         final_result = self._build_final_result(step_results)
+        error_occurred = None
+        for s in step_results:
+            if s.status == "error":
+                error_occurred = s.error or f"步骤 {s.index} ({s.name}) 执行失败"
+                break
 
-        # 持久化调用记录(非关键路径,失败不阻止 normal 流程)
-        try:
-            self._save_record(request, trace_id, record_id,
-                              chain_cfg, step_results, total_duration)
-        except Exception as exc:
-            logger.warning(f"[DebugExecutor] 保存调用记录失败: {exc}")
+        # 转换为可序列化的步骤列表
+        steps_collected = [
+            {
+                "index": s.index,
+                "name": s.name,
+                "status": s.status,
+                "duration_ms": int(s.duration * 1000) if s.duration else 0,
+                "input": s.input_summary,
+                "output": s.output_summary,
+            }
+            for s in step_results
+        ]
 
-        await event_queue.put(("completed", {
+        completed_data = {
             "task_id": trace_id,
             "chain_id": chain_id,
             "total_duration": round(total_duration, 3),
             "record_id": record_id,
             "final_result": final_result,
-        }))
+        }
+
+        await event_queue.put(("completed", completed_data))
+
+        return {
+            "completed_data": completed_data,
+            "steps_collected": steps_collected,
+            "error_occurred": error_occurred,
+            "record_id": record_id,
+        }
 
     # ----------------------------------------------------------
     # 直调 LLM 链路(6 个)

+ 2 - 2
frontend/src/components/StepNode.tsx

@@ -18,7 +18,7 @@ const nodeStyle: React.CSSProperties = {
 };
 
 const nodeErrorStyle: React.CSSProperties = {
-  borderColor: '#fca5a5',
+  border: '1px solid #fca5a5',
 };
 
 const headerStyle: React.CSSProperties = {
@@ -184,7 +184,7 @@ export default function StepNode({
   const currentBorderStyle: React.CSSProperties = {
     ...nodeStyle,
     ...(step.status === 'error' ? nodeErrorStyle : {}),
-    ...(isOpen ? { borderColor: '#c0c0c8' } : {}),
+    ...(isOpen ? { border: '1px solid #c0c0c8' } : {}),
   };
 
   const stepNameColor: React.CSSProperties = {

+ 22 - 62
views/debug/debug_api.py

@@ -432,63 +432,37 @@ async def _background_execute(
     record_id: str,
     chain_id: str,
 ) -> None:
-    """后台执行审查调试,通过 event_queue 推送进度"""
-    async with debug_semaphore:
-        completed_data = None
-        error_occurred = None
-        steps_collected: List[dict] = []
+    """后台执行审查调试,通过 event_queue 推送进度给 SSE 端点。
 
+    不消费 event_queue —— 仅启动 executor 并等待其完成,
+    将结果持久化到调用记录。SSE 事件由 GET stream 端点独立消费。
+    """
+    async with debug_semaphore:
         try:
-            # 发送 started 事件
+            from core.debug.executor import DebugExecutor
+
             await event_queue.put(("started", {
                 "task_id": task_id,
                 "chain_id": chain_id,
                 "total_steps": CHAIN_STEPS_COUNT.get(chain_id, 3),
             }))
 
-            # 启动执行器
-            exec_task = asyncio.create_task(
-                _run_debug_execution(request, event_queue, task_id, record_id)
-            )
-
-            # 消费队列事件
-            while True:
-                try:
-                    event_type, data = await asyncio.wait_for(
-                        event_queue.get(),
-                        timeout=DEBUG_GLOBAL_TIMEOUT,
-                    )
-                except asyncio.TimeoutError:
-                    error_occurred = "执行超时"
-                    await event_queue.put(("error", {
-                        "task_id": task_id,
-                        "message": f"执行超时 (>{DEBUG_GLOBAL_TIMEOUT}s)",
-                    }))
-                    exec_task.cancel()
-                    break
-
-                if event_type == "__end__":
-                    break
-
-                if event_type == "started":
-                    continue
-
-                if event_type == "step_result":
-                    steps_collected.append({
-                        "index": data.get("step_index"),
-                        "name": data.get("step_name"),
-                        "status": data.get("status"),
-                        "duration_ms": int((data.get("duration") or 0) * 1000),
-                        "input": data.get("input", {}),
-                        "output": data.get("output", {}),
-                    })
-
-                if event_type == "error":
-                    error_occurred = data.get("message", "未知错误")
+            executor = DebugExecutor()
+            result = await executor.execute(request, event_queue)
 
-                if event_type == "completed":
-                    completed_data = data
-                    break
+            # 保存调用记录
+            try:
+                await _save_debug_record(
+                    request=request,
+                    task_id=task_id,
+                    record_id=record_id,
+                    chain_id=chain_id,
+                    completed_data=result.get("completed_data"),
+                    error_message=result.get("error_occurred"),
+                    steps=result.get("steps_collected", []),
+                )
+            except Exception as exc:
+                logger.warning("[execute_review] 保存调用记录失败: %s", exc)
 
         except asyncio.CancelledError:
             await event_queue.put(("error", {
@@ -504,20 +478,6 @@ async def _background_execute(
         finally:
             _running_tasks.pop(task_id, None)
 
-            # 自动保存调用记录
-            try:
-                await _save_debug_record(
-                    request=request,
-                    task_id=task_id,
-                    record_id=record_id,
-                    chain_id=chain_id,
-                    completed_data=completed_data,
-                    error_message=error_occurred,
-                    steps=steps_collected,
-                )
-            except Exception as exc:
-                logger.warning("[execute_review] 保存调用记录失败: %s", exc)
-
 
 # ============ SSE 断线重连端点 ============