""" 调用记录管理器 存储和查询审查调试调用记录,使用 JSON 文件存储。 每条记录为一个独立 JSON 文件,索引文件 index.json 加速列表查询。 记录 ID 格式: call-{YYYYMMDD}-{HHMMSS}-{hex} """ import os import json import uuid import asyncio import logging from datetime import datetime, timedelta logger = logging.getLogger(__name__) class RecordManager: """调用记录管理器""" RECORDS_DIR = "temp/debug_console/call_records/" MAX_RECORDS = 10000 MAX_INDEX = 500 RETENTION_DAYS = 30 def __init__(self): self._lock = asyncio.Lock() os.makedirs(self.RECORDS_DIR, exist_ok=True) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ async def save_record(self, record: dict) -> str: """保存调用记录,返回 record_id""" # 生成或沿用 record_id record_id = ( record.get("record_id") or record.get("id") or _generate_record_id() ) record_data = self._normalize_record(record, record_id) # 写入单条 JSON 文件 filepath = os.path.join(self.RECORDS_DIR, f"{record_id}.json") with open(filepath, "w", encoding="utf-8") as f: json.dump(record_data, f, ensure_ascii=False, indent=2) # 更新索引(线程安全) async with self._lock: index = await self._load_index() self._update_index_inner(index, record_data) self._write_index(index) # 超出上限时清理最旧记录 await self._enforce_max_records() return record_id async def get_record(self, record_id: str) -> dict | None: """获取单条调用记录,不存在时返回 None""" filepath = os.path.join(self.RECORDS_DIR, f"{record_id}.json") if not os.path.exists(filepath): return None with open(filepath, "r", encoding="utf-8") as f: return json.load(f) async def list_records( self, chain_id: str = None, status: str = None, time_range: str = None, search: str = None, sort_field: str = "timestamp", sort_order: str = "desc", page: int = 1, page_size: int = 20, ) -> dict: """分页查询调用记录列表 返回: { "total": int, "page": int, "page_size": int, "total_pages": int, "items": list[dict], "chains": list[str], "status_counts": dict[str, int], } """ async with self._lock: index = await self._load_index() records = list(index.get("records", [])) # ---- 筛选 ---- if chain_id: records = [r for r in records if r.get("chain") == chain_id] if status: records = [r for r in records if r.get("status") == status] if time_range and time_range != "all": records = self._filter_by_time_range(records, time_range) if search: q = search.lower() records = [ r for r in records if q in ( # 匹配 id / doc_ref / chain_name (r.get("id") or "") + (r.get("doc_ref") or "") + (r.get("chain_name") or "") ).lower() ] # ---- 排序 ---- reverse = sort_order.lower() != "asc" key_fn = _sort_key_func(sort_field) records.sort(key=key_fn, reverse=reverse) # ---- 分页 ---- total = len(records) start = (page - 1) * page_size items = records[start : start + page_size] # ---- 预处理前端展示字段 ---- for item in items: duration_ms = item.get("duration_ms", 0) or 0 if duration_ms >= 1000: item["duration"] = f"{duration_ms / 1000:.1f}s" elif duration_ms > 0: item["duration"] = f"{duration_ms}ms" else: item["duration"] = "0s" # ---- 聚合信息 ---- all_chains = sorted({r.get("chain") for r in records if r.get("chain")}) status_counts: dict[str, int] = {} for r in records: s = r.get("status", "unknown") status_counts[s] = status_counts.get(s, 0) + 1 total_pages = (total + page_size - 1) // page_size if total > 0 else 0 return { "total": total, "page": page, "page_size": page_size, "total_pages": total_pages, "items": items, "chains": all_chains, "status_counts": status_counts, } async def export_records(self, record_ids: list[str]) -> dict: """批量导出调用记录,返回 {record_id: record_data, ...}""" result: dict[str, dict] = {} for rid in record_ids: record = await self.get_record(rid) if record is not None: result[rid] = record return result async def cleanup_old_records(self) -> int: """删除超过 RETENTION_DAYS 天未修改的记录文件,返回清理数量""" cutoff = datetime.now() - timedelta(days=self.RETENTION_DAYS) cleaned = 0 if not os.path.isdir(self.RECORDS_DIR): return 0 for filename in list(os.listdir(self.RECORDS_DIR)): if not filename.endswith(".json") or filename == "index.json": continue filepath = os.path.join(self.RECORDS_DIR, filename) try: mtime = datetime.fromtimestamp(os.path.getmtime(filepath)) if mtime < cutoff: os.remove(filepath) cleaned += 1 except OSError as exc: logger.warning("清理过期记录文件失败 %s: %s", filepath, exc) # 同步清理索引中的已删除条目 async with self._lock: index = await self._load_index() alive_ids = set() for fn in os.listdir(self.RECORDS_DIR): if fn.endswith(".json") and fn != "index.json": alive_ids.add(fn[:-5]) # 去掉 .json 后缀 index["records"] = [r for r in index["records"] if r.get("id") in alive_ids] index["count"] = len(index["records"]) index["updated_at"] = datetime.now().isoformat() self._write_index(index) return cleaned # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _normalize_record(self, record: dict, record_id: str) -> dict: """将输入的记录 dict 归一化为统一的存储格式""" timestamp = ( record.get("timestamp") or record.get("time") or datetime.now().isoformat() ) chain = record.get("chain_id") or record.get("chain", "") duration_ms = record.get("duration_ms") or record.get("duration", 0) or 0 prompt_ver = record.get("prompt_version") or record.get("prompt_ver", "") result_text = record.get("result") or record.get("final_result", "") or "" error_msg = record.get("error_message") # 构造 params 对象 params = record.get("params") if params is None: inp = record.get("input", {}) params = { "review_content": inp.get("review_content", record.get("review_content", "")), "review_references": inp.get("review_references", record.get("review_references", "")), "model_override": record.get("model_override"), "function_name": record.get("function_name", ""), "timeout": record.get("timeout", 60), } # 构造 execution_params execution_params = record.get("execution_params") if execution_params is None: execution_params = { "isolation_mode": record.get("isolation_mode", False), "isolation_steps": record.get("isolation_steps", []), "rag_params": record.get("rag_params"), } return { "id": record_id, "time": timestamp, "chain": chain, "chain_name": record.get("chain_name", ""), "doc_ref": record.get("doc_ref", ""), "status": record.get("status", ""), "duration_ms": int(duration_ms) if not isinstance(duration_ms, int) else duration_ms, "model": record.get("model", ""), "function_name": record.get("function_name", ""), "prompt_ver": prompt_ver, "prompt_name": record.get("prompt_name", ""), "tokens": record.get("tokens", 0), "params": params, "execution_params": execution_params, "steps": record.get("steps", []), "result": result_text, "error_message": error_msg, } def _update_index_inner(self, index: dict, record: dict): """更新索引(调用方需持有 _lock)""" result_text = (record.get("result") or "")[:100] entry = { "id": record["id"], "time": record.get("time", datetime.now().isoformat()), "chain": record.get("chain", ""), "chain_name": record.get("chain_name", ""), "doc_ref": record.get("doc_ref", ""), "duration_ms": record.get("duration_ms", 0), "status": record.get("status", ""), "model": record.get("model", ""), "prompt_ver": record.get("prompt_ver", ""), "tokens": record.get("tokens", 0), "result_preview": result_text, } records_list = index.setdefault("records", []) records_list.insert(0, entry) # 裁剪到 MAX_INDEX if len(records_list) > self.MAX_INDEX: index["records"] = records_list[: self.MAX_INDEX] index["count"] = len(index["records"]) index["updated_at"] = datetime.now().isoformat() async def _load_index(self) -> dict: """加载索引文件内容""" path = os.path.join(self.RECORDS_DIR, "index.json") if os.path.isfile(path): try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError) as exc: logger.warning("索引文件损坏,将重新创建: %s", exc) return {"updated_at": datetime.now().isoformat(), "count": 0, "records": []} def _write_index(self, index: dict): """写入索引文件(调用方需持有 _lock)""" path = os.path.join(self.RECORDS_DIR, "index.json") with open(path, "w", encoding="utf-8") as f: json.dump(index, f, ensure_ascii=False, indent=2) async def _enforce_max_records(self): """当总记录数超过 MAX_RECORDS 时删除最旧的记录""" files: list[tuple[float, str]] = [] for fn in os.listdir(self.RECORDS_DIR): if not fn.endswith(".json") or fn == "index.json": continue fpath = os.path.join(self.RECORDS_DIR, fn) try: mtime = os.path.getmtime(fpath) files.append((mtime, fpath)) except OSError: continue if len(files) <= self.MAX_RECORDS: return files.sort(key=lambda x: x[0]) # 最旧在前 to_delete = len(files) - self.MAX_RECORDS for _, fpath in files[:to_delete]: try: os.remove(fpath) except OSError as exc: logger.warning("删除超出上限的旧记录失败 %s: %s", fpath, exc) @staticmethod def _filter_by_time_range(records: list, time_range: str) -> list: """按时间范围筛选记录""" now = datetime.now() range_map = { "today": now.replace(hour=0, minute=0, second=0, microsecond=0), "7d": now - timedelta(days=7), "30d": now - timedelta(days=30), } cutoff = range_map.get(time_range) if cutoff is None: return records # 未知范围,不过滤 result = [] for r in records: ts = r.get("time", "") if not ts: continue try: dt = datetime.fromisoformat(ts) if "T" in ts else datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") if dt >= cutoff: result.append(r) except (ValueError, TypeError): result.append(r) return result # ------------------------------------------------------------------ # Module-level helpers # ------------------------------------------------------------------ def _generate_record_id() -> str: """生成记录 ID: call-{YYYYMMDD}-{HHMMSS}-{hex}""" now = datetime.now() return f"call-{now.strftime('%Y%m%d')}-{now.strftime('%H%M%S')}-{uuid.uuid4().hex[:6]}" def _sort_key_func(sort_field: str): """根据排序字段名返回对应的 key 函数""" field = sort_field if field == "timestamp": field = "time" elif field == "duration": field = "duration_ms" def key_fn(r): val = r.get(field) if field == "duration_ms": try: return float(val) if val is not None else 0 except (ValueError, TypeError): return 0 if val is None: return "" if field != "duration_ms" else 0 return val return key_fn