""" SSO Token 内存缓存服务 缓存 token → 用户信息映射,减少对 SSO 中心的请求压力。 支持可配置的 TTL 过期策略。 """ import time import threading from dataclasses import dataclass, field from typing import Dict, Any, Optional @dataclass class CacheEntry: """缓存条目,包含用户信息和创建时间""" user_info: Dict[str, Any] created_at: float = field(default_factory=time.time) def is_expired(self, ttl: float) -> bool: """检查缓存条目是否已过期""" return (time.time() - self.created_at) > ttl class TokenCacheService: """ SSO Token 内存缓存服务 通过缓存 token → 用户信息的映射,避免每次 API 请求都调用 SSO 中心验证。 缓存条目在 TTL 过期后自动失效,下次访问时会被清除。 """ def __init__(self, ttl_seconds: int = 300): """ Args: ttl_seconds: 缓存过期时间(秒),默认 300 秒(5 分钟) """ self._cache: Dict[str, CacheEntry] = {} self._ttl = ttl_seconds self._lock = threading.Lock() def get(self, token: str) -> Optional[Dict[str, Any]]: """ 查询缓存,返回用户信息或 None。 如果缓存条目已过期,自动删除并返回 None。 Args: token: SSO access_token Returns: 用户信息字典,或 None(未命中/已过期) """ with self._lock: entry = self._cache.get(token) if entry is None: return None if entry.is_expired(self._ttl): del self._cache[token] return None return entry.user_info def set(self, token: str, user_info: Dict[str, Any]) -> None: """ 写入缓存。 Args: token: SSO access_token user_info: 用户信息字典 """ with self._lock: self._cache[token] = CacheEntry(user_info=user_info) def invalidate(self, token: str) -> None: """ 使指定 token 的缓存失效。 Args: token: SSO access_token """ with self._lock: self._cache.pop(token, None) def clear(self) -> None: """清空所有缓存""" with self._lock: self._cache.clear()