record_api.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. """
  2. 调用记录管理 API 端点
  3. 提供审查调试历史调用记录的查询、详情、回放和导出功能。
  4. 功能:
  5. 1. GET /api/call-records — 分页查询,支持链路/状态/时间范围/关键词筛选
  6. 2. GET /api/call-records/{record_id} — 单条记录详情
  7. 3. POST /api/call-records/replay — 回放(SSE 流,含 replay_comparison 事件)
  8. 4. POST /api/call-records/export — 批量导出 JSON 文件下载
  9. """
  10. import asyncio
  11. import json
  12. import logging
  13. from datetime import datetime
  14. from typing import Optional, List
  15. from fastapi import APIRouter, HTTPException, Query
  16. from fastapi.responses import Response, StreamingResponse
  17. from core.debug.record_manager import RecordManager
  18. from core.debug.sse_utils import (
  19. format_sse_event,
  20. debug_semaphore,
  21. MAX_CONCURRENT_DEBUG_TASKS,
  22. DEBUG_GLOBAL_TIMEOUT,
  23. make_trace_id,
  24. make_record_id,
  25. CHAIN_NAMES,
  26. CHAIN_STEPS_COUNT,
  27. )
  28. from views.debug.debug_api import (
  29. DebugExecuteRequest,
  30. ReplayRequest,
  31. ExportRequest,
  32. )
  33. logger = logging.getLogger(__name__)
  34. # 全局 RecordManager 实例
  35. _record_manager = None
  36. def _get_manager() -> RecordManager:
  37. """获取或创建 RecordManager 单例"""
  38. global _record_manager
  39. if _record_manager is None:
  40. _record_manager = RecordManager()
  41. return _record_manager
  42. def register_routes(router: APIRouter):
  43. """在指定 router 上注册调用记录管理路由"""
  44. @router.get(
  45. "/api/call-records",
  46. summary="查询调用记录列表",
  47. description="分页查询历史调用记录,支持链路筛选、状态筛选、时间范围筛选和关键词搜索。",
  48. )
  49. async def list_call_records(
  50. chain: str = Query(default=None, description="链路筛选"),
  51. status: str = Query(default=None, description="状态筛选: succ / fail / timeout"),
  52. time_range: str = Query(default=None, description="时间范围: today / 7d / 30d / all"),
  53. search: str = Query(default=None, description="关键词搜索(匹配文档名/ID)"),
  54. page: int = Query(default=1, ge=1, description="页码"),
  55. page_size: int = Query(default=20, ge=1, le=100, description="每页条数"),
  56. sort_by: str = Query(default="time", description="排序字段: time / duration"),
  57. sort_order: str = Query(default="desc", description="排序方向: desc / asc"),
  58. ):
  59. """
  60. 分页查询历史调用记录,支持筛选。
  61. """
  62. try:
  63. manager = _get_manager()
  64. result = await manager.list_records(
  65. chain_id=chain,
  66. status=status,
  67. time_range=time_range or "all",
  68. search=search,
  69. sort_field=sort_by,
  70. sort_order=sort_order,
  71. page=page,
  72. page_size=page_size,
  73. )
  74. # 补充 chains 信息(全链路列表)
  75. result["chains"] = [
  76. "完整性", "时效性", "规范性",
  77. "敏感词", "语义逻辑", "语法", "专业性",
  78. ]
  79. result["status"] = "ok"
  80. return result
  81. except Exception as e:
  82. logger.error("查询调用记录列表失败: %s", e)
  83. raise HTTPException(status_code=500, detail=str(e))
  84. @router.get(
  85. "/api/call-records/{record_id}",
  86. summary="获取调用详情",
  87. description="获取单条调用记录的完整详情,包括输入参数、各步骤输入输出、最终结果。",
  88. )
  89. async def get_call_record_detail(record_id: str):
  90. """
  91. 获取单条调用记录的完整详情。
  92. """
  93. try:
  94. manager = _get_manager()
  95. record = await manager.get_record(record_id)
  96. if record is None:
  97. raise HTTPException(
  98. status_code=404,
  99. detail=f"调用记录不存在: {record_id}",
  100. )
  101. return record
  102. except HTTPException:
  103. raise
  104. except Exception as e:
  105. logger.error("获取调用详情失败: %s", e)
  106. raise HTTPException(status_code=500, detail=str(e))
  107. @router.post(
  108. "/api/call-records/replay",
  109. summary="回放调用",
  110. description="用历史调用的输入参数重新执行审查,支持覆盖模型和提示词版本。返回 SSE 流式响应。",
  111. )
  112. async def replay_call_record(request: ReplayRequest):
  113. """
  114. 用历史调用的输入参数重新执行审查。
  115. 返回 SSE 流式响应,末尾追加 replay_comparison 事件。
  116. """
  117. # ---- 检查并发上限 ----
  118. if debug_semaphore.locked():
  119. raise HTTPException(
  120. status_code=429,
  121. detail=(
  122. f"并发调试任务数已达上限 ({MAX_CONCURRENT_DEBUG_TASKS}),"
  123. f"请等待其他任务完成后再试"
  124. ),
  125. )
  126. async def event_generator():
  127. async with debug_semaphore:
  128. manager = _get_manager()
  129. original = await manager.get_record(request.record_id)
  130. if original is None:
  131. yield format_sse_event("error", {
  132. "message": f"调用记录不存在: {request.record_id}",
  133. })
  134. return
  135. chain_id = original.get("chain", "completeness")
  136. total_steps = CHAIN_STEPS_COUNT.get(chain_id, 3)
  137. task_id = make_trace_id(chain_id)
  138. replay_record_id = make_record_id()
  139. # 构建执行参数(合并覆盖参数)
  140. override = request.override_params
  141. exec_params = original.get("params", {})
  142. # 深度拷贝 manual_inputs
  143. manual_inputs = dict(original.get("execution_params", {}).get("manual_inputs", {}))
  144. isolation_mode = original.get("execution_params", {}).get("isolation_mode", False)
  145. isolation_steps = list(original.get("execution_params", {}).get("isolation_steps", []))
  146. # 构建 DebugExecuteRequest
  147. exec_request = DebugExecuteRequest(
  148. chain_id=chain_id,
  149. content=exec_params.get("review_content", ""),
  150. reference=exec_params.get("review_references", ""),
  151. model=(
  152. override.model
  153. if override and override.model is not None
  154. else exec_params.get("model_override")
  155. ),
  156. function_name=exec_params.get("function_name", ""),
  157. prompt_version=(
  158. override.prompt_version
  159. if override and override.prompt_version is not None
  160. else original.get("prompt_ver")
  161. ),
  162. timeout=exec_params.get("timeout", 60),
  163. rag_params=(
  164. override.rag_params
  165. if override and override.rag_params is not None
  166. else original.get("execution_params", {}).get("rag_params")
  167. ),
  168. isolation_mode=isolation_mode,
  169. isolation_steps=isolation_steps,
  170. manual_inputs=manual_inputs,
  171. )
  172. # 发送自定义 started 事件(含 replay 标记)
  173. yield format_sse_event("started", {
  174. "task_id": task_id,
  175. "record_id": request.record_id,
  176. "is_replay": True,
  177. "chain_id": chain_id,
  178. "total_steps": total_steps,
  179. })
  180. # ---- 执行回放 ----
  181. event_queue: asyncio.Queue = asyncio.Queue()
  182. exec_task = asyncio.create_task(
  183. _run_replay_execution(exec_request, event_queue, task_id)
  184. )
  185. completed_data = None
  186. steps_collected: List[dict] = []
  187. replay_error = None
  188. try:
  189. while True:
  190. try:
  191. event_type, data = await asyncio.wait_for(
  192. event_queue.get(),
  193. timeout=DEBUG_GLOBAL_TIMEOUT,
  194. )
  195. except asyncio.TimeoutError:
  196. yield format_sse_event("error", {
  197. "task_id": task_id,
  198. "message": f"回放执行超时 (>{DEBUG_GLOBAL_TIMEOUT}s)",
  199. })
  200. exec_task.cancel()
  201. break
  202. if event_type == "__end__":
  203. break
  204. if event_type == "started":
  205. continue # 跳过 executor 的 started
  206. if event_type == "error":
  207. replay_error = data.get("message", "回放执行错误")
  208. yield format_sse_event(event_type, data)
  209. break
  210. if event_type == "step_result":
  211. steps_collected.append({
  212. "index": data.get("step_index"),
  213. "name": data.get("step_name"),
  214. "status": data.get("status"),
  215. "duration_ms": int((data.get("duration") or 0) * 1000),
  216. "input": data.get("input", {}),
  217. "output": data.get("output", {}),
  218. })
  219. if event_type == "completed":
  220. completed_data = data
  221. yield format_sse_event(event_type, data)
  222. break
  223. yield format_sse_event(event_type, data)
  224. except asyncio.CancelledError:
  225. yield format_sse_event("error", {
  226. "task_id": task_id,
  227. "message": "回放连接已断开",
  228. })
  229. except Exception as exc:
  230. logger.exception("[replay] 事件处理异常")
  231. yield format_sse_event("error", {
  232. "task_id": task_id,
  233. "message": str(exc),
  234. })
  235. finally:
  236. if not exec_task.done():
  237. exec_task.cancel()
  238. # ---- 计算 diff 并保存回放记录 ----
  239. if completed_data or replay_error:
  240. replay_status = "succ"
  241. if replay_error:
  242. replay_status = "timeout" if "超时" in replay_error else "fail"
  243. replay_duration = completed_data.get("total_duration", 0) if completed_data else 0
  244. replay_result_summary = ""
  245. if completed_data:
  246. replay_result_summary = str(
  247. completed_data.get("final_result", {}).get("summary", "")
  248. )
  249. # 保存回放记录
  250. replay_record_data = {
  251. "id": replay_record_id,
  252. "time": datetime.now().isoformat(),
  253. "chain": chain_id,
  254. "chain_name": CHAIN_NAMES.get(chain_id, ""),
  255. "doc_ref": original.get("doc_ref", ""),
  256. "status": replay_status,
  257. "duration_ms": int(replay_duration * 1000),
  258. "model": exec_request.model or "default",
  259. "prompt_ver": exec_request.prompt_version or "",
  260. "tokens": 0,
  261. "params": exec_params,
  262. "execution_params": {
  263. "isolation_mode": isolation_mode,
  264. "isolation_steps": isolation_steps,
  265. },
  266. "steps": steps_collected,
  267. "result": replay_result_summary,
  268. "error_message": replay_error,
  269. }
  270. try:
  271. await manager.save_record(replay_record_data)
  272. except Exception as exc:
  273. logger.warning("[replay] 保存回放记录失败: %s", exc)
  274. # 构建对比数据
  275. diff = _compute_replay_diff(original, replay_record_data)
  276. # 发送 replay_comparison 事件
  277. yield format_sse_event("replay_comparison", {
  278. "record_id": request.record_id,
  279. "original": {
  280. "time": original.get("time", ""),
  281. "model": original.get("model", ""),
  282. "prompt_ver": original.get("prompt_ver", ""),
  283. "duration_ms": original.get("duration_ms", 0),
  284. "status": original.get("status", ""),
  285. "result_summary": (original.get("result", "") or "")[:200],
  286. },
  287. "replay": {
  288. "time": datetime.now().isoformat(),
  289. "model": exec_request.model or "default",
  290. "prompt_ver": exec_request.prompt_version or "",
  291. "duration_ms": int(replay_duration * 1000),
  292. "status": replay_status,
  293. "result_summary": replay_result_summary[:200],
  294. },
  295. "diffs": diff,
  296. })
  297. # 发送 completed 事件(含 replay_record_id)
  298. yield format_sse_event("completed", {
  299. "task_id": task_id,
  300. "record_id": request.record_id,
  301. "replay_record_id": replay_record_id,
  302. "total_duration": round(replay_duration, 3),
  303. "final_result": (
  304. completed_data.get("final_result", {})
  305. if completed_data else {}
  306. ),
  307. })
  308. return StreamingResponse(
  309. event_generator(),
  310. media_type="text/event-stream",
  311. headers={
  312. "Cache-Control": "no-cache",
  313. "X-Accel-Buffering": "no",
  314. },
  315. )
  316. @router.post(
  317. "/api/call-records/export",
  318. summary="导出调用记录",
  319. description="导出选中的调用记录为 JSON 文件。",
  320. )
  321. async def export_call_records(request: ExportRequest):
  322. """
  323. 导出选中的调用记录为 JSON 文件下载。
  324. """
  325. try:
  326. manager = _get_manager()
  327. records_data = await manager.export_records(request.record_ids)
  328. if not records_data:
  329. raise HTTPException(
  330. status_code=404,
  331. detail="未找到任何调用记录",
  332. )
  333. content = json.dumps(records_data, ensure_ascii=False, indent=2)
  334. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  335. return Response(
  336. content=content,
  337. media_type="application/json",
  338. headers={
  339. "Content-Disposition": (
  340. f'attachment; filename="call_records_export_{timestamp}.json"'
  341. ),
  342. },
  343. )
  344. except HTTPException:
  345. raise
  346. except Exception as e:
  347. logger.error("导出调用记录失败: %s", e)
  348. raise HTTPException(status_code=500, detail=str(e))
  349. # ============================================================
  350. # 内部辅助函数
  351. # ============================================================
  352. async def _run_replay_execution(
  353. request: DebugExecuteRequest,
  354. event_queue: asyncio.Queue,
  355. task_id: str,
  356. ) -> None:
  357. """执行回放任务"""
  358. try:
  359. from core.debug.executor import DebugExecutor
  360. executor = DebugExecutor()
  361. await executor.execute(request, event_queue)
  362. except asyncio.CancelledError:
  363. await event_queue.put(("error", {
  364. "task_id": task_id,
  365. "message": "回放执行被取消",
  366. }))
  367. except Exception as exc:
  368. logger.exception("[_run_replay_execution] 执行异常")
  369. await event_queue.put(("error", {
  370. "task_id": task_id,
  371. "message": str(exc),
  372. }))
  373. def _compute_replay_diff(original: dict, replay: dict) -> dict:
  374. """
  375. 计算原始记录和回放结果的差异。
  376. Args:
  377. original: 原始调用记录
  378. replay: 回放调用记录
  379. Returns:
  380. diff 结构: {added_items, removed_items, changed_items}
  381. """
  382. added_items: list = []
  383. removed_items: list = []
  384. changed_items: list = []
  385. # 比较时长
  386. orig_dur = original.get("duration_ms", 0) or 0
  387. replay_dur = replay.get("duration_ms", 0) or 0
  388. if abs(orig_dur - replay_dur) > 100:
  389. changed_items.append({
  390. "field": "执行耗时",
  391. "original": f"{orig_dur}ms",
  392. "replay": f"{replay_dur}ms",
  393. })
  394. # 比较状态
  395. orig_status = original.get("status", "")
  396. replay_status = replay.get("status", "")
  397. if orig_status != replay_status:
  398. changed_items.append({
  399. "field": "执行状态",
  400. "original": orig_status,
  401. "replay": replay_status,
  402. })
  403. # 比较结果摘要
  404. orig_result = (original.get("result", "") or "").strip()
  405. replay_result = (replay.get("result", "") or "").strip()
  406. if orig_result != replay_result:
  407. changed_items.append({
  408. "field": "审查摘要",
  409. "original": orig_result[:200] if orig_result else "(无)",
  410. "replay": replay_result[:200] if replay_result else "(无)",
  411. })
  412. # 比较模型
  413. orig_model = original.get("model", "")
  414. replay_model = replay.get("model", "")
  415. if orig_model != replay_model:
  416. changed_items.append({
  417. "field": "模型",
  418. "original": orig_model,
  419. "replay": replay_model,
  420. })
  421. return {
  422. "added_items": added_items,
  423. "removed_items": removed_items,
  424. "changed_items": changed_items,
  425. }