| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- """
- 调用记录管理器
- 存储和查询审查调试调用记录,使用 JSON 文件存储。
- 每条记录为一个独立 JSON 文件,索引文件 index.json 加速列表查询。
- 记录 ID 格式: call-{YYYYMMDD}-{HHMMSS}-{hex}
- """
- import os
- import json
- import uuid
- import asyncio
- import logging
- from datetime import datetime, timedelta
- logger = logging.getLogger(__name__)
- class RecordManager:
- """调用记录管理器"""
- RECORDS_DIR = "temp/debug_console/call_records/"
- MAX_RECORDS = 10000
- MAX_INDEX = 500
- RETENTION_DAYS = 30
- def __init__(self):
- self._lock = asyncio.Lock()
- os.makedirs(self.RECORDS_DIR, exist_ok=True)
- # ------------------------------------------------------------------
- # Public API
- # ------------------------------------------------------------------
- async def save_record(self, record: dict) -> str:
- """保存调用记录,返回 record_id"""
- # 生成或沿用 record_id
- record_id = (
- record.get("record_id")
- or record.get("id")
- or _generate_record_id()
- )
- record_data = self._normalize_record(record, record_id)
- # 写入单条 JSON 文件
- filepath = os.path.join(self.RECORDS_DIR, f"{record_id}.json")
- with open(filepath, "w", encoding="utf-8") as f:
- json.dump(record_data, f, ensure_ascii=False, indent=2)
- # 更新索引(线程安全)
- async with self._lock:
- index = await self._load_index()
- self._update_index_inner(index, record_data)
- self._write_index(index)
- # 超出上限时清理最旧记录
- await self._enforce_max_records()
- return record_id
- async def get_record(self, record_id: str) -> dict | None:
- """获取单条调用记录,不存在时返回 None"""
- filepath = os.path.join(self.RECORDS_DIR, f"{record_id}.json")
- if not os.path.exists(filepath):
- return None
- with open(filepath, "r", encoding="utf-8") as f:
- return json.load(f)
- async def list_records(
- self,
- chain_id: str = None,
- status: str = None,
- time_range: str = None,
- search: str = None,
- sort_field: str = "timestamp",
- sort_order: str = "desc",
- page: int = 1,
- page_size: int = 20,
- ) -> dict:
- """分页查询调用记录列表
- 返回:
- {
- "total": int,
- "page": int,
- "page_size": int,
- "total_pages": int,
- "items": list[dict],
- "chains": list[str],
- "status_counts": dict[str, int],
- }
- """
- async with self._lock:
- index = await self._load_index()
- records = list(index.get("records", []))
- # ---- 筛选 ----
- if chain_id:
- records = [r for r in records if r.get("chain") == chain_id]
- if status:
- records = [r for r in records if r.get("status") == status]
- if time_range and time_range != "all":
- records = self._filter_by_time_range(records, time_range)
- if search:
- q = search.lower()
- records = [
- r
- for r in records
- if q
- in ( # 匹配 id / doc_ref / chain_name
- (r.get("id") or "")
- + (r.get("doc_ref") or "")
- + (r.get("chain_name") or "")
- ).lower()
- ]
- # ---- 排序 ----
- reverse = sort_order.lower() != "asc"
- key_fn = _sort_key_func(sort_field)
- records.sort(key=key_fn, reverse=reverse)
- # ---- 分页 ----
- total = len(records)
- start = (page - 1) * page_size
- items = records[start : start + page_size]
- # ---- 预处理前端展示字段 ----
- for item in items:
- duration_ms = item.get("duration_ms", 0) or 0
- if duration_ms >= 1000:
- item["duration"] = f"{duration_ms / 1000:.1f}s"
- elif duration_ms > 0:
- item["duration"] = f"{duration_ms}ms"
- else:
- item["duration"] = "0s"
- # ---- 聚合信息 ----
- all_chains = sorted({r.get("chain") for r in records if r.get("chain")})
- status_counts: dict[str, int] = {}
- for r in records:
- s = r.get("status", "unknown")
- status_counts[s] = status_counts.get(s, 0) + 1
- total_pages = (total + page_size - 1) // page_size if total > 0 else 0
- return {
- "total": total,
- "page": page,
- "page_size": page_size,
- "total_pages": total_pages,
- "items": items,
- "chains": all_chains,
- "status_counts": status_counts,
- }
- async def export_records(self, record_ids: list[str]) -> dict:
- """批量导出调用记录,返回 {record_id: record_data, ...}"""
- result: dict[str, dict] = {}
- for rid in record_ids:
- record = await self.get_record(rid)
- if record is not None:
- result[rid] = record
- return result
- async def cleanup_old_records(self) -> int:
- """删除超过 RETENTION_DAYS 天未修改的记录文件,返回清理数量"""
- cutoff = datetime.now() - timedelta(days=self.RETENTION_DAYS)
- cleaned = 0
- if not os.path.isdir(self.RECORDS_DIR):
- return 0
- for filename in list(os.listdir(self.RECORDS_DIR)):
- if not filename.endswith(".json") or filename == "index.json":
- continue
- filepath = os.path.join(self.RECORDS_DIR, filename)
- try:
- mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
- if mtime < cutoff:
- os.remove(filepath)
- cleaned += 1
- except OSError as exc:
- logger.warning("清理过期记录文件失败 %s: %s", filepath, exc)
- # 同步清理索引中的已删除条目
- async with self._lock:
- index = await self._load_index()
- alive_ids = set()
- for fn in os.listdir(self.RECORDS_DIR):
- if fn.endswith(".json") and fn != "index.json":
- alive_ids.add(fn[:-5]) # 去掉 .json 后缀
- index["records"] = [r for r in index["records"] if r.get("id") in alive_ids]
- index["count"] = len(index["records"])
- index["updated_at"] = datetime.now().isoformat()
- self._write_index(index)
- return cleaned
- # ------------------------------------------------------------------
- # Internal helpers
- # ------------------------------------------------------------------
- def _normalize_record(self, record: dict, record_id: str) -> dict:
- """将输入的记录 dict 归一化为统一的存储格式"""
- timestamp = (
- record.get("timestamp")
- or record.get("time")
- or datetime.now().isoformat()
- )
- chain = record.get("chain_id") or record.get("chain", "")
- duration_ms = record.get("duration_ms") or record.get("duration", 0) or 0
- prompt_ver = record.get("prompt_version") or record.get("prompt_ver", "")
- result_text = record.get("result") or record.get("final_result", "") or ""
- error_msg = record.get("error_message")
- # 构造 params 对象
- params = record.get("params")
- if params is None:
- inp = record.get("input", {})
- params = {
- "review_content": inp.get("review_content", record.get("review_content", "")),
- "review_references": inp.get("review_references", record.get("review_references", "")),
- "model_override": record.get("model_override"),
- "function_name": record.get("function_name", ""),
- "timeout": record.get("timeout", 60),
- }
- # 构造 execution_params
- execution_params = record.get("execution_params")
- if execution_params is None:
- execution_params = {
- "isolation_mode": record.get("isolation_mode", False),
- "isolation_steps": record.get("isolation_steps", []),
- "rag_params": record.get("rag_params"),
- }
- return {
- "id": record_id,
- "time": timestamp,
- "chain": chain,
- "chain_name": record.get("chain_name", ""),
- "doc_ref": record.get("doc_ref", ""),
- "status": record.get("status", ""),
- "duration_ms": int(duration_ms) if not isinstance(duration_ms, int) else duration_ms,
- "model": record.get("model", ""),
- "function_name": record.get("function_name", ""),
- "prompt_ver": prompt_ver,
- "prompt_name": record.get("prompt_name", ""),
- "tokens": record.get("tokens", 0),
- "params": params,
- "execution_params": execution_params,
- "steps": record.get("steps", []),
- "result": result_text,
- "error_message": error_msg,
- }
- def _update_index_inner(self, index: dict, record: dict):
- """更新索引(调用方需持有 _lock)"""
- result_text = (record.get("result") or "")[:100]
- entry = {
- "id": record["id"],
- "time": record.get("time", datetime.now().isoformat()),
- "chain": record.get("chain", ""),
- "chain_name": record.get("chain_name", ""),
- "doc_ref": record.get("doc_ref", ""),
- "duration_ms": record.get("duration_ms", 0),
- "status": record.get("status", ""),
- "model": record.get("model", ""),
- "prompt_ver": record.get("prompt_ver", ""),
- "tokens": record.get("tokens", 0),
- "result_preview": result_text,
- }
- records_list = index.setdefault("records", [])
- records_list.insert(0, entry)
- # 裁剪到 MAX_INDEX
- if len(records_list) > self.MAX_INDEX:
- index["records"] = records_list[: self.MAX_INDEX]
- index["count"] = len(index["records"])
- index["updated_at"] = datetime.now().isoformat()
- async def _load_index(self) -> dict:
- """加载索引文件内容"""
- path = os.path.join(self.RECORDS_DIR, "index.json")
- if os.path.isfile(path):
- try:
- with open(path, "r", encoding="utf-8") as f:
- return json.load(f)
- except (json.JSONDecodeError, OSError) as exc:
- logger.warning("索引文件损坏,将重新创建: %s", exc)
- return {"updated_at": datetime.now().isoformat(), "count": 0, "records": []}
- def _write_index(self, index: dict):
- """写入索引文件(调用方需持有 _lock)"""
- path = os.path.join(self.RECORDS_DIR, "index.json")
- with open(path, "w", encoding="utf-8") as f:
- json.dump(index, f, ensure_ascii=False, indent=2)
- async def _enforce_max_records(self):
- """当总记录数超过 MAX_RECORDS 时删除最旧的记录"""
- files: list[tuple[float, str]] = []
- for fn in os.listdir(self.RECORDS_DIR):
- if not fn.endswith(".json") or fn == "index.json":
- continue
- fpath = os.path.join(self.RECORDS_DIR, fn)
- try:
- mtime = os.path.getmtime(fpath)
- files.append((mtime, fpath))
- except OSError:
- continue
- if len(files) <= self.MAX_RECORDS:
- return
- files.sort(key=lambda x: x[0]) # 最旧在前
- to_delete = len(files) - self.MAX_RECORDS
- for _, fpath in files[:to_delete]:
- try:
- os.remove(fpath)
- except OSError as exc:
- logger.warning("删除超出上限的旧记录失败 %s: %s", fpath, exc)
- @staticmethod
- def _filter_by_time_range(records: list, time_range: str) -> list:
- """按时间范围筛选记录"""
- now = datetime.now()
- range_map = {
- "today": now.replace(hour=0, minute=0, second=0, microsecond=0),
- "7d": now - timedelta(days=7),
- "30d": now - timedelta(days=30),
- }
- cutoff = range_map.get(time_range)
- if cutoff is None:
- return records # 未知范围,不过滤
- result = []
- for r in records:
- ts = r.get("time", "")
- if not ts:
- continue
- try:
- dt = datetime.fromisoformat(ts) if "T" in ts else datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")
- if dt >= cutoff:
- result.append(r)
- except (ValueError, TypeError):
- result.append(r)
- return result
- # ------------------------------------------------------------------
- # Module-level helpers
- # ------------------------------------------------------------------
- def _generate_record_id() -> str:
- """生成记录 ID: call-{YYYYMMDD}-{HHMMSS}-{hex}"""
- now = datetime.now()
- return f"call-{now.strftime('%Y%m%d')}-{now.strftime('%H%M%S')}-{uuid.uuid4().hex[:6]}"
- def _sort_key_func(sort_field: str):
- """根据排序字段名返回对应的 key 函数"""
- field = sort_field
- if field == "timestamp":
- field = "time"
- elif field == "duration":
- field = "duration_ms"
- def key_fn(r):
- val = r.get(field)
- if field == "duration_ms":
- try:
- return float(val) if val is not None else 0
- except (ValueError, TypeError):
- return 0
- if val is None:
- return "" if field != "duration_ms" else 0
- return val
- return key_fn
|