| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488 |
- """
- 调用记录管理 API 端点
- 提供审查调试历史调用记录的查询、详情、回放和导出功能。
- 功能:
- 1. GET /api/call-records — 分页查询,支持链路/状态/时间范围/关键词筛选
- 2. GET /api/call-records/{record_id} — 单条记录详情
- 3. POST /api/call-records/replay — 回放(SSE 流,含 replay_comparison 事件)
- 4. POST /api/call-records/export — 批量导出 JSON 文件下载
- """
- import asyncio
- import json
- import logging
- from datetime import datetime
- from typing import Optional, List
- from fastapi import APIRouter, HTTPException, Query
- from fastapi.responses import Response, StreamingResponse
- from core.debug.record_manager import RecordManager
- from core.debug.sse_utils import (
- format_sse_event,
- debug_semaphore,
- MAX_CONCURRENT_DEBUG_TASKS,
- DEBUG_GLOBAL_TIMEOUT,
- make_trace_id,
- make_record_id,
- CHAIN_NAMES,
- CHAIN_STEPS_COUNT,
- )
- from views.debug.debug_api import (
- DebugExecuteRequest,
- ReplayRequest,
- ExportRequest,
- )
- logger = logging.getLogger(__name__)
- # 全局 RecordManager 实例
- _record_manager = None
- def _get_manager() -> RecordManager:
- """获取或创建 RecordManager 单例"""
- global _record_manager
- if _record_manager is None:
- _record_manager = RecordManager()
- return _record_manager
- def register_routes(router: APIRouter):
- """在指定 router 上注册调用记录管理路由"""
- @router.get(
- "/api/call-records",
- summary="查询调用记录列表",
- description="分页查询历史调用记录,支持链路筛选、状态筛选、时间范围筛选和关键词搜索。",
- )
- async def list_call_records(
- chain: str = Query(default=None, description="链路筛选"),
- status: str = Query(default=None, description="状态筛选: succ / fail / timeout"),
- time_range: str = Query(default=None, description="时间范围: today / 7d / 30d / all"),
- search: str = Query(default=None, description="关键词搜索(匹配文档名/ID)"),
- page: int = Query(default=1, ge=1, description="页码"),
- page_size: int = Query(default=20, ge=1, le=100, description="每页条数"),
- sort_by: str = Query(default="time", description="排序字段: time / duration"),
- sort_order: str = Query(default="desc", description="排序方向: desc / asc"),
- ):
- """
- 分页查询历史调用记录,支持筛选。
- """
- try:
- manager = _get_manager()
- result = await manager.list_records(
- chain_id=chain,
- status=status,
- time_range=time_range or "all",
- search=search,
- sort_field=sort_by,
- sort_order=sort_order,
- page=page,
- page_size=page_size,
- )
- # 补充 chains 信息(全链路列表)
- result["chains"] = [
- "完整性", "时效性", "规范性",
- "敏感词", "语义逻辑", "语法", "专业性",
- ]
- result["status"] = "ok"
- return result
- except Exception as e:
- logger.error("查询调用记录列表失败: %s", e)
- raise HTTPException(status_code=500, detail=str(e))
- @router.get(
- "/api/call-records/{record_id}",
- summary="获取调用详情",
- description="获取单条调用记录的完整详情,包括输入参数、各步骤输入输出、最终结果。",
- )
- async def get_call_record_detail(record_id: str):
- """
- 获取单条调用记录的完整详情。
- """
- try:
- manager = _get_manager()
- record = await manager.get_record(record_id)
- if record is None:
- raise HTTPException(
- status_code=404,
- detail=f"调用记录不存在: {record_id}",
- )
- return record
- except HTTPException:
- raise
- except Exception as e:
- logger.error("获取调用详情失败: %s", e)
- raise HTTPException(status_code=500, detail=str(e))
- @router.post(
- "/api/call-records/replay",
- summary="回放调用",
- description="用历史调用的输入参数重新执行审查,支持覆盖模型和提示词版本。返回 SSE 流式响应。",
- )
- async def replay_call_record(request: ReplayRequest):
- """
- 用历史调用的输入参数重新执行审查。
- 返回 SSE 流式响应,末尾追加 replay_comparison 事件。
- """
- # ---- 检查并发上限 ----
- if debug_semaphore.locked():
- raise HTTPException(
- status_code=429,
- detail=(
- f"并发调试任务数已达上限 ({MAX_CONCURRENT_DEBUG_TASKS}),"
- f"请等待其他任务完成后再试"
- ),
- )
- async def event_generator():
- async with debug_semaphore:
- manager = _get_manager()
- original = await manager.get_record(request.record_id)
- if original is None:
- yield format_sse_event("error", {
- "message": f"调用记录不存在: {request.record_id}",
- })
- return
- chain_id = original.get("chain", "completeness")
- total_steps = CHAIN_STEPS_COUNT.get(chain_id, 3)
- task_id = make_trace_id(chain_id)
- replay_record_id = make_record_id()
- # 构建执行参数(合并覆盖参数)
- override = request.override_params
- exec_params = original.get("params", {})
- # 深度拷贝 manual_inputs
- manual_inputs = dict(original.get("execution_params", {}).get("manual_inputs", {}))
- isolation_mode = original.get("execution_params", {}).get("isolation_mode", False)
- isolation_steps = list(original.get("execution_params", {}).get("isolation_steps", []))
- # 构建 DebugExecuteRequest
- exec_request = DebugExecuteRequest(
- chain_id=chain_id,
- content=exec_params.get("review_content", ""),
- reference=exec_params.get("review_references", ""),
- model=(
- override.model
- if override and override.model is not None
- else exec_params.get("model_override")
- ),
- function_name=exec_params.get("function_name", ""),
- prompt_version=(
- override.prompt_version
- if override and override.prompt_version is not None
- else original.get("prompt_ver")
- ),
- timeout=exec_params.get("timeout", 60),
- rag_params=(
- override.rag_params
- if override and override.rag_params is not None
- else original.get("execution_params", {}).get("rag_params")
- ),
- isolation_mode=isolation_mode,
- isolation_steps=isolation_steps,
- manual_inputs=manual_inputs,
- )
- # 发送自定义 started 事件(含 replay 标记)
- yield format_sse_event("started", {
- "task_id": task_id,
- "record_id": request.record_id,
- "is_replay": True,
- "chain_id": chain_id,
- "total_steps": total_steps,
- })
- # ---- 执行回放 ----
- event_queue: asyncio.Queue = asyncio.Queue()
- exec_task = asyncio.create_task(
- _run_replay_execution(exec_request, event_queue, task_id)
- )
- completed_data = None
- steps_collected: List[dict] = []
- replay_error = None
- try:
- while True:
- try:
- event_type, data = await asyncio.wait_for(
- event_queue.get(),
- timeout=DEBUG_GLOBAL_TIMEOUT,
- )
- except asyncio.TimeoutError:
- yield format_sse_event("error", {
- "task_id": task_id,
- "message": f"回放执行超时 (>{DEBUG_GLOBAL_TIMEOUT}s)",
- })
- exec_task.cancel()
- break
- if event_type == "__end__":
- break
- if event_type == "started":
- continue # 跳过 executor 的 started
- if event_type == "error":
- replay_error = data.get("message", "回放执行错误")
- yield format_sse_event(event_type, data)
- break
- if event_type == "step_result":
- steps_collected.append({
- "index": data.get("step_index"),
- "name": data.get("step_name"),
- "status": data.get("status"),
- "duration_ms": int((data.get("duration") or 0) * 1000),
- "input": data.get("input", {}),
- "output": data.get("output", {}),
- })
- if event_type == "completed":
- completed_data = data
- yield format_sse_event(event_type, data)
- break
- yield format_sse_event(event_type, data)
- except asyncio.CancelledError:
- yield format_sse_event("error", {
- "task_id": task_id,
- "message": "回放连接已断开",
- })
- except Exception as exc:
- logger.exception("[replay] 事件处理异常")
- yield format_sse_event("error", {
- "task_id": task_id,
- "message": str(exc),
- })
- finally:
- if not exec_task.done():
- exec_task.cancel()
- # ---- 计算 diff 并保存回放记录 ----
- if completed_data or replay_error:
- replay_status = "succ"
- if replay_error:
- replay_status = "timeout" if "超时" in replay_error else "fail"
- replay_duration = completed_data.get("total_duration", 0) if completed_data else 0
- replay_result_summary = ""
- if completed_data:
- replay_result_summary = str(
- completed_data.get("final_result", {}).get("summary", "")
- )
- # 保存回放记录
- replay_record_data = {
- "id": replay_record_id,
- "time": datetime.now().isoformat(),
- "chain": chain_id,
- "chain_name": CHAIN_NAMES.get(chain_id, ""),
- "doc_ref": original.get("doc_ref", ""),
- "status": replay_status,
- "duration_ms": int(replay_duration * 1000),
- "model": exec_request.model or "default",
- "prompt_ver": exec_request.prompt_version or "",
- "tokens": 0,
- "params": exec_params,
- "execution_params": {
- "isolation_mode": isolation_mode,
- "isolation_steps": isolation_steps,
- },
- "steps": steps_collected,
- "result": replay_result_summary,
- "error_message": replay_error,
- }
- try:
- await manager.save_record(replay_record_data)
- except Exception as exc:
- logger.warning("[replay] 保存回放记录失败: %s", exc)
- # 构建对比数据
- diff = _compute_replay_diff(original, replay_record_data)
- # 发送 replay_comparison 事件
- yield format_sse_event("replay_comparison", {
- "record_id": request.record_id,
- "original": {
- "time": original.get("time", ""),
- "model": original.get("model", ""),
- "prompt_ver": original.get("prompt_ver", ""),
- "duration_ms": original.get("duration_ms", 0),
- "status": original.get("status", ""),
- "result_summary": (original.get("result", "") or "")[:200],
- },
- "replay": {
- "time": datetime.now().isoformat(),
- "model": exec_request.model or "default",
- "prompt_ver": exec_request.prompt_version or "",
- "duration_ms": int(replay_duration * 1000),
- "status": replay_status,
- "result_summary": replay_result_summary[:200],
- },
- "diffs": diff,
- })
- # 发送 completed 事件(含 replay_record_id)
- yield format_sse_event("completed", {
- "task_id": task_id,
- "record_id": request.record_id,
- "replay_record_id": replay_record_id,
- "total_duration": round(replay_duration, 3),
- "final_result": (
- completed_data.get("final_result", {})
- if completed_data else {}
- ),
- })
- return StreamingResponse(
- event_generator(),
- media_type="text/event-stream",
- headers={
- "Cache-Control": "no-cache",
- "X-Accel-Buffering": "no",
- },
- )
- @router.post(
- "/api/call-records/export",
- summary="导出调用记录",
- description="导出选中的调用记录为 JSON 文件。",
- )
- async def export_call_records(request: ExportRequest):
- """
- 导出选中的调用记录为 JSON 文件下载。
- """
- try:
- manager = _get_manager()
- records_data = await manager.export_records(request.record_ids)
- if not records_data:
- raise HTTPException(
- status_code=404,
- detail="未找到任何调用记录",
- )
- content = json.dumps(records_data, ensure_ascii=False, indent=2)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- return Response(
- content=content,
- media_type="application/json",
- headers={
- "Content-Disposition": (
- f'attachment; filename="call_records_export_{timestamp}.json"'
- ),
- },
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error("导出调用记录失败: %s", e)
- raise HTTPException(status_code=500, detail=str(e))
- # ============================================================
- # 内部辅助函数
- # ============================================================
- async def _run_replay_execution(
- request: DebugExecuteRequest,
- event_queue: asyncio.Queue,
- task_id: str,
- ) -> None:
- """执行回放任务"""
- try:
- from core.debug.executor import DebugExecutor
- executor = DebugExecutor()
- await executor.execute(request, event_queue)
- except asyncio.CancelledError:
- await event_queue.put(("error", {
- "task_id": task_id,
- "message": "回放执行被取消",
- }))
- except Exception as exc:
- logger.exception("[_run_replay_execution] 执行异常")
- await event_queue.put(("error", {
- "task_id": task_id,
- "message": str(exc),
- }))
- def _compute_replay_diff(original: dict, replay: dict) -> dict:
- """
- 计算原始记录和回放结果的差异。
- Args:
- original: 原始调用记录
- replay: 回放调用记录
- Returns:
- diff 结构: {added_items, removed_items, changed_items}
- """
- added_items: list = []
- removed_items: list = []
- changed_items: list = []
- # 比较时长
- orig_dur = original.get("duration_ms", 0) or 0
- replay_dur = replay.get("duration_ms", 0) or 0
- if abs(orig_dur - replay_dur) > 100:
- changed_items.append({
- "field": "执行耗时",
- "original": f"{orig_dur}ms",
- "replay": f"{replay_dur}ms",
- })
- # 比较状态
- orig_status = original.get("status", "")
- replay_status = replay.get("status", "")
- if orig_status != replay_status:
- changed_items.append({
- "field": "执行状态",
- "original": orig_status,
- "replay": replay_status,
- })
- # 比较结果摘要
- orig_result = (original.get("result", "") or "").strip()
- replay_result = (replay.get("result", "") or "").strip()
- if orig_result != replay_result:
- changed_items.append({
- "field": "审查摘要",
- "original": orig_result[:200] if orig_result else "(无)",
- "replay": replay_result[:200] if replay_result else "(无)",
- })
- # 比较模型
- orig_model = original.get("model", "")
- replay_model = replay.get("model", "")
- if orig_model != replay_model:
- changed_items.append({
- "field": "模型",
- "original": orig_model,
- "replay": replay_model,
- })
- return {
- "added_items": added_items,
- "removed_items": removed_items,
- "changed_items": changed_items,
- }
|