""" 调用记录管理 API 端点 提供审查调试历史调用记录的查询、详情、回放和导出功能。 功能: 1. GET /api/call-records — 分页查询,支持链路/状态/时间范围/关键词筛选 2. GET /api/call-records/{record_id} — 单条记录详情 3. POST /api/call-records/replay — 回放(SSE 流,含 replay_comparison 事件) 4. POST /api/call-records/export — 批量导出 JSON 文件下载 """ import asyncio import json import logging from datetime import datetime from typing import Optional, List from fastapi import APIRouter, HTTPException, Query from fastapi.responses import Response, StreamingResponse from core.debug.record_manager import RecordManager from core.debug.sse_utils import ( format_sse_event, debug_semaphore, MAX_CONCURRENT_DEBUG_TASKS, DEBUG_GLOBAL_TIMEOUT, make_trace_id, make_record_id, CHAIN_NAMES, CHAIN_STEPS_COUNT, ) from views.debug.debug_api import ( DebugExecuteRequest, ReplayRequest, ExportRequest, ) logger = logging.getLogger(__name__) # 全局 RecordManager 实例 _record_manager = None def _get_manager() -> RecordManager: """获取或创建 RecordManager 单例""" global _record_manager if _record_manager is None: _record_manager = RecordManager() return _record_manager def register_routes(router: APIRouter): """在指定 router 上注册调用记录管理路由""" @router.get( "/api/call-records", summary="查询调用记录列表", description="分页查询历史调用记录,支持链路筛选、状态筛选、时间范围筛选和关键词搜索。", ) async def list_call_records( chain: str = Query(default=None, description="链路筛选"), status: str = Query(default=None, description="状态筛选: succ / fail / timeout"), time_range: str = Query(default=None, description="时间范围: today / 7d / 30d / all"), search: str = Query(default=None, description="关键词搜索(匹配文档名/ID)"), page: int = Query(default=1, ge=1, description="页码"), page_size: int = Query(default=20, ge=1, le=100, description="每页条数"), sort_by: str = Query(default="time", description="排序字段: time / duration"), sort_order: str = Query(default="desc", description="排序方向: desc / asc"), ): """ 分页查询历史调用记录,支持筛选。 """ try: manager = _get_manager() result = await manager.list_records( chain_id=chain, status=status, time_range=time_range or "all", search=search, sort_field=sort_by, sort_order=sort_order, page=page, page_size=page_size, ) # 补充 chains 信息(全链路列表) result["chains"] = [ "完整性", "时效性", "规范性", "敏感词", "语义逻辑", "语法", "专业性", ] result["status"] = "ok" return result except Exception as e: logger.error("查询调用记录列表失败: %s", e) raise HTTPException(status_code=500, detail=str(e)) @router.get( "/api/call-records/{record_id}", summary="获取调用详情", description="获取单条调用记录的完整详情,包括输入参数、各步骤输入输出、最终结果。", ) async def get_call_record_detail(record_id: str): """ 获取单条调用记录的完整详情。 """ try: manager = _get_manager() record = await manager.get_record(record_id) if record is None: raise HTTPException( status_code=404, detail=f"调用记录不存在: {record_id}", ) return record except HTTPException: raise except Exception as e: logger.error("获取调用详情失败: %s", e) raise HTTPException(status_code=500, detail=str(e)) @router.post( "/api/call-records/replay", summary="回放调用", description="用历史调用的输入参数重新执行审查,支持覆盖模型和提示词版本。返回 SSE 流式响应。", ) async def replay_call_record(request: ReplayRequest): """ 用历史调用的输入参数重新执行审查。 返回 SSE 流式响应,末尾追加 replay_comparison 事件。 """ # ---- 检查并发上限 ---- if debug_semaphore.locked(): raise HTTPException( status_code=429, detail=( f"并发调试任务数已达上限 ({MAX_CONCURRENT_DEBUG_TASKS})," f"请等待其他任务完成后再试" ), ) async def event_generator(): async with debug_semaphore: manager = _get_manager() original = await manager.get_record(request.record_id) if original is None: yield format_sse_event("error", { "message": f"调用记录不存在: {request.record_id}", }) return chain_id = original.get("chain", "completeness") total_steps = CHAIN_STEPS_COUNT.get(chain_id, 3) task_id = make_trace_id(chain_id) replay_record_id = make_record_id() # 构建执行参数(合并覆盖参数) override = request.override_params exec_params = original.get("params", {}) # 深度拷贝 manual_inputs manual_inputs = dict(original.get("execution_params", {}).get("manual_inputs", {})) isolation_mode = original.get("execution_params", {}).get("isolation_mode", False) isolation_steps = list(original.get("execution_params", {}).get("isolation_steps", [])) # 构建 DebugExecuteRequest exec_request = DebugExecuteRequest( chain_id=chain_id, content=exec_params.get("review_content", ""), reference=exec_params.get("review_references", ""), model=( override.model if override and override.model is not None else exec_params.get("model_override") ), function_name=exec_params.get("function_name", ""), prompt_version=( override.prompt_version if override and override.prompt_version is not None else original.get("prompt_ver") ), timeout=exec_params.get("timeout", 60), rag_params=( override.rag_params if override and override.rag_params is not None else original.get("execution_params", {}).get("rag_params") ), isolation_mode=isolation_mode, isolation_steps=isolation_steps, manual_inputs=manual_inputs, ) # 发送自定义 started 事件(含 replay 标记) yield format_sse_event("started", { "task_id": task_id, "record_id": request.record_id, "is_replay": True, "chain_id": chain_id, "total_steps": total_steps, }) # ---- 执行回放 ---- event_queue: asyncio.Queue = asyncio.Queue() exec_task = asyncio.create_task( _run_replay_execution(exec_request, event_queue, task_id) ) completed_data = None steps_collected: List[dict] = [] replay_error = None try: while True: try: event_type, data = await asyncio.wait_for( event_queue.get(), timeout=DEBUG_GLOBAL_TIMEOUT, ) except asyncio.TimeoutError: yield format_sse_event("error", { "task_id": task_id, "message": f"回放执行超时 (>{DEBUG_GLOBAL_TIMEOUT}s)", }) exec_task.cancel() break if event_type == "__end__": break if event_type == "started": continue # 跳过 executor 的 started if event_type == "error": replay_error = data.get("message", "回放执行错误") yield format_sse_event(event_type, data) break if event_type == "step_result": steps_collected.append({ "index": data.get("step_index"), "name": data.get("step_name"), "status": data.get("status"), "duration_ms": int((data.get("duration") or 0) * 1000), "input": data.get("input", {}), "output": data.get("output", {}), }) if event_type == "completed": completed_data = data yield format_sse_event(event_type, data) break yield format_sse_event(event_type, data) except asyncio.CancelledError: yield format_sse_event("error", { "task_id": task_id, "message": "回放连接已断开", }) except Exception as exc: logger.exception("[replay] 事件处理异常") yield format_sse_event("error", { "task_id": task_id, "message": str(exc), }) finally: if not exec_task.done(): exec_task.cancel() # ---- 计算 diff 并保存回放记录 ---- if completed_data or replay_error: replay_status = "succ" if replay_error: replay_status = "timeout" if "超时" in replay_error else "fail" replay_duration = completed_data.get("total_duration", 0) if completed_data else 0 replay_result_summary = "" if completed_data: replay_result_summary = str( completed_data.get("final_result", {}).get("summary", "") ) # 保存回放记录 replay_record_data = { "id": replay_record_id, "time": datetime.now().isoformat(), "chain": chain_id, "chain_name": CHAIN_NAMES.get(chain_id, ""), "doc_ref": original.get("doc_ref", ""), "status": replay_status, "duration_ms": int(replay_duration * 1000), "model": exec_request.model or "default", "prompt_ver": exec_request.prompt_version or "", "tokens": 0, "params": exec_params, "execution_params": { "isolation_mode": isolation_mode, "isolation_steps": isolation_steps, }, "steps": steps_collected, "result": replay_result_summary, "error_message": replay_error, } try: await manager.save_record(replay_record_data) except Exception as exc: logger.warning("[replay] 保存回放记录失败: %s", exc) # 构建对比数据 diff = _compute_replay_diff(original, replay_record_data) # 发送 replay_comparison 事件 yield format_sse_event("replay_comparison", { "record_id": request.record_id, "original": { "time": original.get("time", ""), "model": original.get("model", ""), "prompt_ver": original.get("prompt_ver", ""), "duration_ms": original.get("duration_ms", 0), "status": original.get("status", ""), "result_summary": (original.get("result", "") or "")[:200], }, "replay": { "time": datetime.now().isoformat(), "model": exec_request.model or "default", "prompt_ver": exec_request.prompt_version or "", "duration_ms": int(replay_duration * 1000), "status": replay_status, "result_summary": replay_result_summary[:200], }, "diffs": diff, }) # 发送 completed 事件(含 replay_record_id) yield format_sse_event("completed", { "task_id": task_id, "record_id": request.record_id, "replay_record_id": replay_record_id, "total_duration": round(replay_duration, 3), "final_result": ( completed_data.get("final_result", {}) if completed_data else {} ), }) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, ) @router.post( "/api/call-records/export", summary="导出调用记录", description="导出选中的调用记录为 JSON 文件。", ) async def export_call_records(request: ExportRequest): """ 导出选中的调用记录为 JSON 文件下载。 """ try: manager = _get_manager() records_data = await manager.export_records(request.record_ids) if not records_data: raise HTTPException( status_code=404, detail="未找到任何调用记录", ) content = json.dumps(records_data, ensure_ascii=False, indent=2) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return Response( content=content, media_type="application/json", headers={ "Content-Disposition": ( f'attachment; filename="call_records_export_{timestamp}.json"' ), }, ) except HTTPException: raise except Exception as e: logger.error("导出调用记录失败: %s", e) raise HTTPException(status_code=500, detail=str(e)) # ============================================================ # 内部辅助函数 # ============================================================ async def _run_replay_execution( request: DebugExecuteRequest, event_queue: asyncio.Queue, task_id: str, ) -> None: """执行回放任务""" try: from core.debug.executor import DebugExecutor executor = DebugExecutor() await executor.execute(request, event_queue) except asyncio.CancelledError: await event_queue.put(("error", { "task_id": task_id, "message": "回放执行被取消", })) except Exception as exc: logger.exception("[_run_replay_execution] 执行异常") await event_queue.put(("error", { "task_id": task_id, "message": str(exc), })) def _compute_replay_diff(original: dict, replay: dict) -> dict: """ 计算原始记录和回放结果的差异。 Args: original: 原始调用记录 replay: 回放调用记录 Returns: diff 结构: {added_items, removed_items, changed_items} """ added_items: list = [] removed_items: list = [] changed_items: list = [] # 比较时长 orig_dur = original.get("duration_ms", 0) or 0 replay_dur = replay.get("duration_ms", 0) or 0 if abs(orig_dur - replay_dur) > 100: changed_items.append({ "field": "执行耗时", "original": f"{orig_dur}ms", "replay": f"{replay_dur}ms", }) # 比较状态 orig_status = original.get("status", "") replay_status = replay.get("status", "") if orig_status != replay_status: changed_items.append({ "field": "执行状态", "original": orig_status, "replay": replay_status, }) # 比较结果摘要 orig_result = (original.get("result", "") or "").strip() replay_result = (replay.get("result", "") or "").strip() if orig_result != replay_result: changed_items.append({ "field": "审查摘要", "original": orig_result[:200] if orig_result else "(无)", "replay": replay_result[:200] if replay_result else "(无)", }) # 比较模型 orig_model = original.get("model", "") replay_model = replay.get("model", "") if orig_model != replay_model: changed_items.append({ "field": "模型", "original": orig_model, "replay": replay_model, }) return { "added_items": added_items, "removed_items": removed_items, "changed_items": changed_items, }