| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- """
- 会话密钥服务
- 提供动态会话密钥的生成、存储和验证功能
- """
- import os
- import time
- import secrets
- from typing import Dict, Optional, Tuple
- class SessionKeyService:
- """会话密钥服务类"""
-
- def __init__(self):
- # 存储会话密钥的内存字典,格式:{key_id: (key, expiry_time)}
- self.session_keys: Dict[str, Tuple[str, float]] = {}
- # 密钥有效期(秒)
- self.key_expiry_seconds = 180 # 3分钟
-
- def generate_session_key(self) -> Tuple[str, str]:
- """
- 生成会话密钥
-
- Returns:
- Tuple[str, str]: (key_id, session_key)
- """
- # 生成唯一的key_id
- key_id = secrets.token_urlsafe(16)
- # 生成32位的AES密钥
- session_key = secrets.token_hex(16) # 16字节 = 32字符
- # 计算过期时间
- expiry_time = time.time() + self.key_expiry_seconds
- # 存储密钥
- self.session_keys[key_id] = (session_key, expiry_time)
- # 清理过期密钥
- self._clean_expired_keys()
- return key_id, session_key
-
- def get_session_key(self, key_id: str) -> Optional[str]:
- """
- 获取会话密钥
-
- Args:
- key_id: 密钥ID
-
- Returns:
- Optional[str]: 会话密钥,如果不存在或已过期则返回None
- """
- # 清理过期密钥
- self._clean_expired_keys()
- # 获取密钥
- key_info = self.session_keys.get(key_id)
- if not key_info:
- return None
-
- session_key, expiry_time = key_info
- # 检查是否过期
- if time.time() > expiry_time:
- # 删除过期密钥
- del self.session_keys[key_id]
- return None
-
- # 确保返回的会话密钥是字符串类型
- return str(session_key)
-
- def remove_session_key(self, key_id: str) -> None:
- """
- 删除会话密钥
-
- Args:
- key_id: 密钥ID
- """
- if key_id in self.session_keys:
- del self.session_keys[key_id]
-
- def _clean_expired_keys(self) -> None:
- """
- 清理过期的会话密钥
- """
- current_time = time.time()
- expired_keys = [
- key_id for key_id, (_, expiry_time) in self.session_keys.items()
- if current_time > expiry_time
- ]
- for key_id in expired_keys:
- del self.session_keys[key_id]
- # 全局实例
- session_key_service = SessionKeyService()
|