| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 缓存管理模块 - 提供统一的缓存文件路径管理和操作接口
- 使用方式:
- from foundation.observability.cachefiles import cache, CacheBaseDir
- # 保存数据(必须指定根目录,文件名自动使用变量名)
- catogues_result = {"key": "value"}
- cache.document_temp(catogues_result, base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW)
- # → temp/construction_review/document_temp/catogues_result.json
- cache.document_temp(catogues_result, base_cache_dir=CacheBaseDir.CONSTRUCTION_WRITE)
- # → temp/construction_write/document_temp/catogues_result.json
- """
- from __future__ import annotations
- import inspect
- import json
- from enum import Enum
- from pathlib import Path
- from typing import Any, Optional, Union
- import pandas as pd
- from foundation.observability.logger.loggering import review_logger as logger
- class CacheBaseDir(str, Enum):
- """缓存主目录枚举"""
- CONSTRUCTION_REVIEW = "temp/construction_review"
- CONSTRUCTION_WRITE = "temp/construction_write"
- RAG_MONITORING = "temp/rag_monitoring"
- class _SubDirCaller:
- """子目录调用器"""
- def __init__(self, subdir: str):
- self.subdir = subdir
- def __call__(
- self,
- data: Any,
- base_cache_dir: Union[str, CacheBaseDir],
- ) -> Path:
- """
- 保存数据,文件名自动使用变量名
- Args:
- data: 要保存的数据
- base_cache_dir: 缓存根目录(必须指定)
- """
- filename = self._get_var_name(data)
- return cache.save(data, self.subdir, filename, base_cache_dir)
- def _get_var_name(self, data: Any) -> str:
- """获取变量名"""
- # 获取调用者的帧
- frame = inspect.currentframe()
- try:
- # 回退两帧:_get_var_name -> __call__ -> 实际调用处
- caller_frame = frame.f_back.f_back
- caller_locals = caller_frame.f_locals
- # 在locals中找到值为data的变量名
- for name, value in caller_locals.items():
- if value is data:
- return name
- # 找不到则返回默认名
- return "data"
- finally:
- del frame
- class CacheManager:
- """全局缓存管理器(单例)"""
- _instance: Optional["CacheManager"] = None
- def __new__(cls) -> "CacheManager":
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance._initialized = False
- return cls._instance
- def __init__(self):
- if self._initialized:
- return
- self._initialized = True
- from .cache_files import SUBDIRS
- self._subdirs = SUBDIRS
- self._ensure_directories()
- def _ensure_directories(self):
- """确保缓存目录存在"""
- for base_dir in CacheBaseDir:
- Path(base_dir.value).mkdir(parents=True, exist_ok=True)
- base = Path(CacheBaseDir.CONSTRUCTION_REVIEW.value)
- for subdir in self._subdirs.values():
- (base / subdir).mkdir(parents=True, exist_ok=True)
- logger.info("[缓存] 缓存目录已初始化")
- def __getattr__(self, subdir: str) -> _SubDirCaller:
- """访问子目录"""
- if subdir in self._subdirs:
- return _SubDirCaller(self._subdirs[subdir])
- raise AttributeError(
- f"'CacheManager' has no subdir '{subdir}'. "
- f"Available: {list(self._subdirs.keys())}"
- )
- def save(
- self,
- data: Any,
- subdir: str,
- filename: str,
- base_cache_dir: Union[str, CacheBaseDir] = CacheBaseDir.CONSTRUCTION_REVIEW,
- ) -> Path:
- """保存数据"""
- if isinstance(base_cache_dir, CacheBaseDir):
- base = Path(base_cache_dir.value)
- else:
- base = Path(base_cache_dir)
- filepath = base / subdir / filename
- filepath.parent.mkdir(parents=True, exist_ok=True)
- if isinstance(data, pd.DataFrame):
- data.to_csv(filepath, index=False, encoding='utf-8-sig')
- elif isinstance(data, (dict, list)):
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- elif isinstance(data, str):
- with open(filepath, 'w', encoding='utf-8') as f:
- f.write(data)
- else:
- raise TypeError(f"不支持的数据类型: {type(data)}")
- logger.debug(f"[缓存] 已保存: {filepath}")
- return filepath
- def load(
- self,
- subdir: str,
- filename: str,
- base_cache_dir: Union[str, CacheBaseDir] = CacheBaseDir.CONSTRUCTION_REVIEW,
- default: Any = None,
- ) -> Any:
- """加载数据"""
- if isinstance(base_cache_dir, CacheBaseDir):
- base = Path(base_cache_dir.value)
- else:
- base = Path(base_cache_dir)
- filepath = base / subdir / filename
- if not filepath.exists():
- return default
- ext = filepath.suffix.lower()
- if ext == '.csv':
- return pd.read_csv(filepath, encoding='utf-8-sig')
- elif ext == '.json':
- with open(filepath, 'r', encoding='utf-8') as f:
- return json.load(f)
- elif ext in ['.txt', '.log', '.md']:
- with open(filepath, 'r', encoding='utf-8') as f:
- return f.read()
- else:
- raise ValueError(f"不支持的文件类型: {ext}")
- # 全局单例
- cache = CacheManager()
|