""" 会话密钥服务 提供动态会话密钥的生成、存储和验证功能 """ import os import time import secrets from typing import Dict, Optional, Tuple class SessionKeyService: """会话密钥服务类""" def __init__(self): # 存储会话密钥的内存字典,格式:{key_id: (key, expiry_time)} self.session_keys: Dict[str, Tuple[str, float]] = {} # 密钥有效期(秒) self.key_expiry_seconds = 180 # 3分钟 def generate_session_key(self) -> Tuple[str, str]: """ 生成会话密钥 Returns: Tuple[str, str]: (key_id, session_key) """ # 生成唯一的key_id key_id = secrets.token_urlsafe(16) # 生成32位的AES密钥 session_key = secrets.token_hex(16) # 16字节 = 32字符 # 计算过期时间 expiry_time = time.time() + self.key_expiry_seconds # 存储密钥 self.session_keys[key_id] = (session_key, expiry_time) # 清理过期密钥 self._clean_expired_keys() return key_id, session_key def get_session_key(self, key_id: str) -> Optional[str]: """ 获取会话密钥 Args: key_id: 密钥ID Returns: Optional[str]: 会话密钥,如果不存在或已过期则返回None """ # 清理过期密钥 self._clean_expired_keys() # 获取密钥 key_info = self.session_keys.get(key_id) if not key_info: return None session_key, expiry_time = key_info # 检查是否过期 if time.time() > expiry_time: # 删除过期密钥 del self.session_keys[key_id] return None # 确保返回的会话密钥是字符串类型 return str(session_key) def remove_session_key(self, key_id: str) -> None: """ 删除会话密钥 Args: key_id: 密钥ID """ if key_id in self.session_keys: del self.session_keys[key_id] def _clean_expired_keys(self) -> None: """ 清理过期的会话密钥 """ current_time = time.time() expired_keys = [ key_id for key_id, (_, expiry_time) in self.session_keys.items() if current_time > expiry_time ] for key_id in expired_keys: del self.session_keys[key_id] # 全局实例 session_key_service = SessionKeyService()