| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- """
- SSO Token 内存缓存服务
- 缓存 token → 用户信息映射,减少对 SSO 中心的请求压力。
- 支持可配置的 TTL 过期策略。
- """
- import time
- import threading
- from dataclasses import dataclass, field
- from typing import Dict, Any, Optional
- @dataclass
- class CacheEntry:
- """缓存条目,包含用户信息和创建时间"""
- user_info: Dict[str, Any]
- created_at: float = field(default_factory=time.time)
- def is_expired(self, ttl: float) -> bool:
- """检查缓存条目是否已过期"""
- return (time.time() - self.created_at) > ttl
- class TokenCacheService:
- """
- SSO Token 内存缓存服务
- 通过缓存 token → 用户信息的映射,避免每次 API 请求都调用 SSO 中心验证。
- 缓存条目在 TTL 过期后自动失效,下次访问时会被清除。
- """
- def __init__(self, ttl_seconds: int = 300):
- """
- Args:
- ttl_seconds: 缓存过期时间(秒),默认 300 秒(5 分钟)
- """
- self._cache: Dict[str, CacheEntry] = {}
- self._ttl = ttl_seconds
- self._lock = threading.Lock()
- def get(self, token: str) -> Optional[Dict[str, Any]]:
- """
- 查询缓存,返回用户信息或 None。
- 如果缓存条目已过期,自动删除并返回 None。
- Args:
- token: SSO access_token
- Returns:
- 用户信息字典,或 None(未命中/已过期)
- """
- with self._lock:
- entry = self._cache.get(token)
- if entry is None:
- return None
- if entry.is_expired(self._ttl):
- del self._cache[token]
- return None
- return entry.user_info
- def set(self, token: str, user_info: Dict[str, Any]) -> None:
- """
- 写入缓存。
- Args:
- token: SSO access_token
- user_info: 用户信息字典
- """
- with self._lock:
- self._cache[token] = CacheEntry(user_info=user_info)
- def invalidate(self, token: str) -> None:
- """
- 使指定 token 的缓存失效。
- Args:
- token: SSO access_token
- """
- with self._lock:
- self._cache.pop(token, None)
- def clear(self) -> None:
- """清空所有缓存"""
- with self._lock:
- self._cache.clear()
|