record_manager.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. """
  2. 调用记录管理器
  3. 存储和查询审查调试调用记录,使用 JSON 文件存储。
  4. 每条记录为一个独立 JSON 文件,索引文件 index.json 加速列表查询。
  5. 记录 ID 格式: call-{YYYYMMDD}-{HHMMSS}-{hex}
  6. """
  7. import os
  8. import json
  9. import uuid
  10. import asyncio
  11. import logging
  12. from datetime import datetime, timedelta
  13. logger = logging.getLogger(__name__)
  14. class RecordManager:
  15. """调用记录管理器"""
  16. RECORDS_DIR = "temp/debug_console/call_records/"
  17. MAX_RECORDS = 10000
  18. MAX_INDEX = 500
  19. RETENTION_DAYS = 30
  20. def __init__(self):
  21. self._lock = asyncio.Lock()
  22. os.makedirs(self.RECORDS_DIR, exist_ok=True)
  23. # ------------------------------------------------------------------
  24. # Public API
  25. # ------------------------------------------------------------------
  26. async def save_record(self, record: dict) -> str:
  27. """保存调用记录,返回 record_id"""
  28. # 生成或沿用 record_id
  29. record_id = (
  30. record.get("record_id")
  31. or record.get("id")
  32. or _generate_record_id()
  33. )
  34. record_data = self._normalize_record(record, record_id)
  35. # 写入单条 JSON 文件
  36. filepath = os.path.join(self.RECORDS_DIR, f"{record_id}.json")
  37. with open(filepath, "w", encoding="utf-8") as f:
  38. json.dump(record_data, f, ensure_ascii=False, indent=2)
  39. # 更新索引(线程安全)
  40. async with self._lock:
  41. index = await self._load_index()
  42. self._update_index_inner(index, record_data)
  43. self._write_index(index)
  44. # 超出上限时清理最旧记录
  45. await self._enforce_max_records()
  46. return record_id
  47. async def get_record(self, record_id: str) -> dict | None:
  48. """获取单条调用记录,不存在时返回 None"""
  49. filepath = os.path.join(self.RECORDS_DIR, f"{record_id}.json")
  50. if not os.path.exists(filepath):
  51. return None
  52. with open(filepath, "r", encoding="utf-8") as f:
  53. return json.load(f)
  54. async def list_records(
  55. self,
  56. chain_id: str = None,
  57. status: str = None,
  58. time_range: str = None,
  59. search: str = None,
  60. sort_field: str = "timestamp",
  61. sort_order: str = "desc",
  62. page: int = 1,
  63. page_size: int = 20,
  64. ) -> dict:
  65. """分页查询调用记录列表
  66. 返回:
  67. {
  68. "total": int,
  69. "page": int,
  70. "page_size": int,
  71. "total_pages": int,
  72. "items": list[dict],
  73. "chains": list[str],
  74. "status_counts": dict[str, int],
  75. }
  76. """
  77. async with self._lock:
  78. index = await self._load_index()
  79. records = list(index.get("records", []))
  80. # ---- 筛选 ----
  81. if chain_id:
  82. records = [r for r in records if r.get("chain") == chain_id]
  83. if status:
  84. records = [r for r in records if r.get("status") == status]
  85. if time_range and time_range != "all":
  86. records = self._filter_by_time_range(records, time_range)
  87. if search:
  88. q = search.lower()
  89. records = [
  90. r
  91. for r in records
  92. if q
  93. in ( # 匹配 id / doc_ref / chain_name
  94. (r.get("id") or "")
  95. + (r.get("doc_ref") or "")
  96. + (r.get("chain_name") or "")
  97. ).lower()
  98. ]
  99. # ---- 排序 ----
  100. reverse = sort_order.lower() != "asc"
  101. key_fn = _sort_key_func(sort_field)
  102. records.sort(key=key_fn, reverse=reverse)
  103. # ---- 分页 ----
  104. total = len(records)
  105. start = (page - 1) * page_size
  106. items = records[start : start + page_size]
  107. # ---- 预处理前端展示字段 ----
  108. for item in items:
  109. duration_ms = item.get("duration_ms", 0) or 0
  110. if duration_ms >= 1000:
  111. item["duration"] = f"{duration_ms / 1000:.1f}s"
  112. elif duration_ms > 0:
  113. item["duration"] = f"{duration_ms}ms"
  114. else:
  115. item["duration"] = "0s"
  116. # ---- 聚合信息 ----
  117. all_chains = sorted({r.get("chain") for r in records if r.get("chain")})
  118. status_counts: dict[str, int] = {}
  119. for r in records:
  120. s = r.get("status", "unknown")
  121. status_counts[s] = status_counts.get(s, 0) + 1
  122. total_pages = (total + page_size - 1) // page_size if total > 0 else 0
  123. return {
  124. "total": total,
  125. "page": page,
  126. "page_size": page_size,
  127. "total_pages": total_pages,
  128. "items": items,
  129. "chains": all_chains,
  130. "status_counts": status_counts,
  131. }
  132. async def export_records(self, record_ids: list[str]) -> dict:
  133. """批量导出调用记录,返回 {record_id: record_data, ...}"""
  134. result: dict[str, dict] = {}
  135. for rid in record_ids:
  136. record = await self.get_record(rid)
  137. if record is not None:
  138. result[rid] = record
  139. return result
  140. async def cleanup_old_records(self) -> int:
  141. """删除超过 RETENTION_DAYS 天未修改的记录文件,返回清理数量"""
  142. cutoff = datetime.now() - timedelta(days=self.RETENTION_DAYS)
  143. cleaned = 0
  144. if not os.path.isdir(self.RECORDS_DIR):
  145. return 0
  146. for filename in list(os.listdir(self.RECORDS_DIR)):
  147. if not filename.endswith(".json") or filename == "index.json":
  148. continue
  149. filepath = os.path.join(self.RECORDS_DIR, filename)
  150. try:
  151. mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
  152. if mtime < cutoff:
  153. os.remove(filepath)
  154. cleaned += 1
  155. except OSError as exc:
  156. logger.warning("清理过期记录文件失败 %s: %s", filepath, exc)
  157. # 同步清理索引中的已删除条目
  158. async with self._lock:
  159. index = await self._load_index()
  160. alive_ids = set()
  161. for fn in os.listdir(self.RECORDS_DIR):
  162. if fn.endswith(".json") and fn != "index.json":
  163. alive_ids.add(fn[:-5]) # 去掉 .json 后缀
  164. index["records"] = [r for r in index["records"] if r.get("id") in alive_ids]
  165. index["count"] = len(index["records"])
  166. index["updated_at"] = datetime.now().isoformat()
  167. self._write_index(index)
  168. return cleaned
  169. # ------------------------------------------------------------------
  170. # Internal helpers
  171. # ------------------------------------------------------------------
  172. def _normalize_record(self, record: dict, record_id: str) -> dict:
  173. """将输入的记录 dict 归一化为统一的存储格式"""
  174. timestamp = (
  175. record.get("timestamp")
  176. or record.get("time")
  177. or datetime.now().isoformat()
  178. )
  179. chain = record.get("chain_id") or record.get("chain", "")
  180. duration_ms = record.get("duration_ms") or record.get("duration", 0) or 0
  181. prompt_ver = record.get("prompt_version") or record.get("prompt_ver", "")
  182. result_text = record.get("result") or record.get("final_result", "") or ""
  183. error_msg = record.get("error_message")
  184. # 构造 params 对象
  185. params = record.get("params")
  186. if params is None:
  187. inp = record.get("input", {})
  188. params = {
  189. "review_content": inp.get("review_content", record.get("review_content", "")),
  190. "review_references": inp.get("review_references", record.get("review_references", "")),
  191. "model_override": record.get("model_override"),
  192. "function_name": record.get("function_name", ""),
  193. "timeout": record.get("timeout", 60),
  194. }
  195. # 构造 execution_params
  196. execution_params = record.get("execution_params")
  197. if execution_params is None:
  198. execution_params = {
  199. "isolation_mode": record.get("isolation_mode", False),
  200. "isolation_steps": record.get("isolation_steps", []),
  201. "rag_params": record.get("rag_params"),
  202. }
  203. return {
  204. "id": record_id,
  205. "time": timestamp,
  206. "chain": chain,
  207. "chain_name": record.get("chain_name", ""),
  208. "doc_ref": record.get("doc_ref", ""),
  209. "status": record.get("status", ""),
  210. "duration_ms": int(duration_ms) if not isinstance(duration_ms, int) else duration_ms,
  211. "model": record.get("model", ""),
  212. "function_name": record.get("function_name", ""),
  213. "prompt_ver": prompt_ver,
  214. "prompt_name": record.get("prompt_name", ""),
  215. "tokens": record.get("tokens", 0),
  216. "params": params,
  217. "execution_params": execution_params,
  218. "steps": record.get("steps", []),
  219. "result": result_text,
  220. "error_message": error_msg,
  221. }
  222. def _update_index_inner(self, index: dict, record: dict):
  223. """更新索引(调用方需持有 _lock)"""
  224. result_text = (record.get("result") or "")[:100]
  225. entry = {
  226. "id": record["id"],
  227. "time": record.get("time", datetime.now().isoformat()),
  228. "chain": record.get("chain", ""),
  229. "chain_name": record.get("chain_name", ""),
  230. "doc_ref": record.get("doc_ref", ""),
  231. "duration_ms": record.get("duration_ms", 0),
  232. "status": record.get("status", ""),
  233. "model": record.get("model", ""),
  234. "prompt_ver": record.get("prompt_ver", ""),
  235. "tokens": record.get("tokens", 0),
  236. "result_preview": result_text,
  237. }
  238. records_list = index.setdefault("records", [])
  239. records_list.insert(0, entry)
  240. # 裁剪到 MAX_INDEX
  241. if len(records_list) > self.MAX_INDEX:
  242. index["records"] = records_list[: self.MAX_INDEX]
  243. index["count"] = len(index["records"])
  244. index["updated_at"] = datetime.now().isoformat()
  245. async def _load_index(self) -> dict:
  246. """加载索引文件内容"""
  247. path = os.path.join(self.RECORDS_DIR, "index.json")
  248. if os.path.isfile(path):
  249. try:
  250. with open(path, "r", encoding="utf-8") as f:
  251. return json.load(f)
  252. except (json.JSONDecodeError, OSError) as exc:
  253. logger.warning("索引文件损坏,将重新创建: %s", exc)
  254. return {"updated_at": datetime.now().isoformat(), "count": 0, "records": []}
  255. def _write_index(self, index: dict):
  256. """写入索引文件(调用方需持有 _lock)"""
  257. path = os.path.join(self.RECORDS_DIR, "index.json")
  258. with open(path, "w", encoding="utf-8") as f:
  259. json.dump(index, f, ensure_ascii=False, indent=2)
  260. async def _enforce_max_records(self):
  261. """当总记录数超过 MAX_RECORDS 时删除最旧的记录"""
  262. files: list[tuple[float, str]] = []
  263. for fn in os.listdir(self.RECORDS_DIR):
  264. if not fn.endswith(".json") or fn == "index.json":
  265. continue
  266. fpath = os.path.join(self.RECORDS_DIR, fn)
  267. try:
  268. mtime = os.path.getmtime(fpath)
  269. files.append((mtime, fpath))
  270. except OSError:
  271. continue
  272. if len(files) <= self.MAX_RECORDS:
  273. return
  274. files.sort(key=lambda x: x[0]) # 最旧在前
  275. to_delete = len(files) - self.MAX_RECORDS
  276. for _, fpath in files[:to_delete]:
  277. try:
  278. os.remove(fpath)
  279. except OSError as exc:
  280. logger.warning("删除超出上限的旧记录失败 %s: %s", fpath, exc)
  281. @staticmethod
  282. def _filter_by_time_range(records: list, time_range: str) -> list:
  283. """按时间范围筛选记录"""
  284. now = datetime.now()
  285. range_map = {
  286. "today": now.replace(hour=0, minute=0, second=0, microsecond=0),
  287. "7d": now - timedelta(days=7),
  288. "30d": now - timedelta(days=30),
  289. }
  290. cutoff = range_map.get(time_range)
  291. if cutoff is None:
  292. return records # 未知范围,不过滤
  293. result = []
  294. for r in records:
  295. ts = r.get("time", "")
  296. if not ts:
  297. continue
  298. try:
  299. dt = datetime.fromisoformat(ts) if "T" in ts else datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")
  300. if dt >= cutoff:
  301. result.append(r)
  302. except (ValueError, TypeError):
  303. result.append(r)
  304. return result
  305. # ------------------------------------------------------------------
  306. # Module-level helpers
  307. # ------------------------------------------------------------------
  308. def _generate_record_id() -> str:
  309. """生成记录 ID: call-{YYYYMMDD}-{HHMMSS}-{hex}"""
  310. now = datetime.now()
  311. return f"call-{now.strftime('%Y%m%d')}-{now.strftime('%H%M%S')}-{uuid.uuid4().hex[:6]}"
  312. def _sort_key_func(sort_field: str):
  313. """根据排序字段名返回对应的 key 函数"""
  314. field = sort_field
  315. if field == "timestamp":
  316. field = "time"
  317. elif field == "duration":
  318. field = "duration_ms"
  319. def key_fn(r):
  320. val = r.get(field)
  321. if field == "duration_ms":
  322. try:
  323. return float(val) if val is not None else 0
  324. except (ValueError, TypeError):
  325. return 0
  326. if val is None:
  327. return "" if field != "duration_ms" else 0
  328. return val
  329. return key_fn