cache_manager.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 缓存管理模块 - 提供统一的缓存文件路径管理和操作接口
  5. 使用方式:
  6. from foundation.observability.cachefiles import cache, CacheBaseDir
  7. # 保存数据(必须指定根目录,文件名自动使用变量名)
  8. catogues_result = {"key": "value"}
  9. cache.document_temp(catogues_result, base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW)
  10. # → temp/construction_review/document_temp/catogues_result.json
  11. cache.document_temp(catogues_result, base_cache_dir=CacheBaseDir.CONSTRUCTION_WRITE)
  12. # → temp/construction_write/document_temp/catogues_result.json
  13. """
  14. from __future__ import annotations
  15. import inspect
  16. import json
  17. from enum import Enum
  18. from pathlib import Path
  19. from typing import Any, Optional, Union
  20. import pandas as pd
  21. from foundation.observability.logger.loggering import review_logger as logger
  22. class CacheBaseDir(str, Enum):
  23. """缓存主目录枚举"""
  24. CONSTRUCTION_REVIEW = "temp/construction_review"
  25. CONSTRUCTION_WRITE = "temp/construction_write"
  26. RAG_MONITORING = "temp/rag_monitoring"
  27. class _SubDirCaller:
  28. """子目录调用器"""
  29. def __init__(self, subdir: str):
  30. self.subdir = subdir
  31. def __call__(
  32. self,
  33. data: Any,
  34. base_cache_dir: Union[str, CacheBaseDir],
  35. ) -> Path:
  36. """
  37. 保存数据,文件名自动使用变量名
  38. Args:
  39. data: 要保存的数据
  40. base_cache_dir: 缓存根目录(必须指定)
  41. """
  42. filename = self._get_var_name(data)
  43. return cache.save(data, self.subdir, filename, base_cache_dir)
  44. def _get_var_name(self, data: Any) -> str:
  45. """获取变量名"""
  46. # 获取调用者的帧
  47. frame = inspect.currentframe()
  48. try:
  49. # 回退两帧:_get_var_name -> __call__ -> 实际调用处
  50. caller_frame = frame.f_back.f_back
  51. caller_locals = caller_frame.f_locals
  52. # 在locals中找到值为data的变量名
  53. for name, value in caller_locals.items():
  54. if value is data:
  55. return name
  56. # 找不到则返回默认名
  57. return "data"
  58. finally:
  59. del frame
  60. class CacheManager:
  61. """全局缓存管理器(单例)"""
  62. _instance: Optional["CacheManager"] = None
  63. def __new__(cls) -> "CacheManager":
  64. if cls._instance is None:
  65. cls._instance = super().__new__(cls)
  66. cls._instance._initialized = False
  67. return cls._instance
  68. def __init__(self):
  69. if self._initialized:
  70. return
  71. self._initialized = True
  72. from .cache_files import SUBDIRS
  73. self._subdirs = SUBDIRS
  74. self._ensure_directories()
  75. def _ensure_directories(self):
  76. """确保缓存目录存在"""
  77. for base_dir in CacheBaseDir:
  78. Path(base_dir.value).mkdir(parents=True, exist_ok=True)
  79. base = Path(CacheBaseDir.CONSTRUCTION_REVIEW.value)
  80. for subdir in self._subdirs.values():
  81. (base / subdir).mkdir(parents=True, exist_ok=True)
  82. logger.info("[缓存] 缓存目录已初始化")
  83. def __getattr__(self, subdir: str) -> _SubDirCaller:
  84. """访问子目录"""
  85. if subdir in self._subdirs:
  86. return _SubDirCaller(self._subdirs[subdir])
  87. raise AttributeError(
  88. f"'CacheManager' has no subdir '{subdir}'. "
  89. f"Available: {list(self._subdirs.keys())}"
  90. )
  91. def save(
  92. self,
  93. data: Any,
  94. subdir: str,
  95. filename: str,
  96. base_cache_dir: Union[str, CacheBaseDir] = CacheBaseDir.CONSTRUCTION_REVIEW,
  97. ) -> Path:
  98. """保存数据"""
  99. if isinstance(base_cache_dir, CacheBaseDir):
  100. base = Path(base_cache_dir.value)
  101. else:
  102. base = Path(base_cache_dir)
  103. filepath = base / subdir / filename
  104. filepath.parent.mkdir(parents=True, exist_ok=True)
  105. if isinstance(data, pd.DataFrame):
  106. data.to_csv(filepath, index=False, encoding='utf-8-sig')
  107. elif isinstance(data, (dict, list)):
  108. with open(filepath, 'w', encoding='utf-8') as f:
  109. json.dump(data, f, ensure_ascii=False, indent=2)
  110. elif isinstance(data, str):
  111. with open(filepath, 'w', encoding='utf-8') as f:
  112. f.write(data)
  113. else:
  114. raise TypeError(f"不支持的数据类型: {type(data)}")
  115. logger.debug(f"[缓存] 已保存: {filepath}")
  116. return filepath
  117. def load(
  118. self,
  119. subdir: str,
  120. filename: str,
  121. base_cache_dir: Union[str, CacheBaseDir] = CacheBaseDir.CONSTRUCTION_REVIEW,
  122. default: Any = None,
  123. ) -> Any:
  124. """加载数据"""
  125. if isinstance(base_cache_dir, CacheBaseDir):
  126. base = Path(base_cache_dir.value)
  127. else:
  128. base = Path(base_cache_dir)
  129. filepath = base / subdir / filename
  130. if not filepath.exists():
  131. return default
  132. ext = filepath.suffix.lower()
  133. if ext == '.csv':
  134. return pd.read_csv(filepath, encoding='utf-8-sig')
  135. elif ext == '.json':
  136. with open(filepath, 'r', encoding='utf-8') as f:
  137. return json.load(f)
  138. elif ext in ['.txt', '.log', '.md']:
  139. with open(filepath, 'r', encoding='utf-8') as f:
  140. return f.read()
  141. else:
  142. raise ValueError(f"不支持的文件类型: {ext}")
  143. # 全局单例
  144. cache = CacheManager()