#!/usr/bin/env python # -*- coding: utf-8 -*- """ 缓存管理模块 - 提供统一的缓存文件路径管理和操作接口 使用方式: from foundation.observability.cachefiles import cache, CacheBaseDir # 保存数据(必须指定根目录,文件名自动使用变量名) catogues_result = {"key": "value"} cache.document_temp(catogues_result, base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW) # → temp/construction_review/document_temp/catogues_result.json cache.document_temp(catogues_result, base_cache_dir=CacheBaseDir.CONSTRUCTION_WRITE) # → temp/construction_write/document_temp/catogues_result.json """ from __future__ import annotations import inspect import json from enum import Enum from pathlib import Path from typing import Any, Optional, Union import pandas as pd from foundation.observability.logger.loggering import review_logger as logger class CacheBaseDir(str, Enum): """缓存主目录枚举""" CONSTRUCTION_REVIEW = "temp/construction_review" CONSTRUCTION_WRITE = "temp/construction_write" RAG_MONITORING = "temp/rag_monitoring" class _SubDirCaller: """子目录调用器""" def __init__(self, subdir: str): self.subdir = subdir def __call__( self, data: Any, base_cache_dir: Union[str, CacheBaseDir], ) -> Path: """ 保存数据,文件名自动使用变量名 Args: data: 要保存的数据 base_cache_dir: 缓存根目录(必须指定) """ filename = self._get_var_name(data) return cache.save(data, self.subdir, filename, base_cache_dir) def _get_var_name(self, data: Any) -> str: """获取变量名""" # 获取调用者的帧 frame = inspect.currentframe() try: # 回退两帧:_get_var_name -> __call__ -> 实际调用处 caller_frame = frame.f_back.f_back caller_locals = caller_frame.f_locals # 在locals中找到值为data的变量名 for name, value in caller_locals.items(): if value is data: return name # 找不到则返回默认名 return "data" finally: del frame class CacheManager: """全局缓存管理器(单例)""" _instance: Optional["CacheManager"] = None def __new__(cls) -> "CacheManager": if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True from .cache_files import SUBDIRS self._subdirs = SUBDIRS self._ensure_directories() def _ensure_directories(self): """确保缓存目录存在""" for base_dir in CacheBaseDir: Path(base_dir.value).mkdir(parents=True, exist_ok=True) base = Path(CacheBaseDir.CONSTRUCTION_REVIEW.value) for subdir in self._subdirs.values(): (base / subdir).mkdir(parents=True, exist_ok=True) logger.info("[缓存] 缓存目录已初始化") def __getattr__(self, subdir: str) -> _SubDirCaller: """访问子目录""" if subdir in self._subdirs: return _SubDirCaller(self._subdirs[subdir]) raise AttributeError( f"'CacheManager' has no subdir '{subdir}'. " f"Available: {list(self._subdirs.keys())}" ) def save( self, data: Any, subdir: str, filename: str, base_cache_dir: Union[str, CacheBaseDir] = CacheBaseDir.CONSTRUCTION_REVIEW, ) -> Path: """保存数据""" if isinstance(base_cache_dir, CacheBaseDir): base = Path(base_cache_dir.value) else: base = Path(base_cache_dir) filepath = base / subdir / filename filepath.parent.mkdir(parents=True, exist_ok=True) if isinstance(data, pd.DataFrame): data.to_csv(filepath, index=False, encoding='utf-8-sig') elif isinstance(data, (dict, list)): with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) elif isinstance(data, str): with open(filepath, 'w', encoding='utf-8') as f: f.write(data) else: raise TypeError(f"不支持的数据类型: {type(data)}") logger.debug(f"[缓存] 已保存: {filepath}") return filepath def load( self, subdir: str, filename: str, base_cache_dir: Union[str, CacheBaseDir] = CacheBaseDir.CONSTRUCTION_REVIEW, default: Any = None, ) -> Any: """加载数据""" if isinstance(base_cache_dir, CacheBaseDir): base = Path(base_cache_dir.value) else: base = Path(base_cache_dir) filepath = base / subdir / filename if not filepath.exists(): return default ext = filepath.suffix.lower() if ext == '.csv': return pd.read_csv(filepath, encoding='utf-8-sig') elif ext == '.json': with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) elif ext in ['.txt', '.log', '.md']: with open(filepath, 'r', encoding='utf-8') as f: return f.read() else: raise ValueError(f"不支持的文件类型: {ext}") # 全局单例 cache = CacheManager()