session_key_service.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. """
  2. 会话密钥服务
  3. 提供动态会话密钥的生成、存储和验证功能
  4. """
  5. import os
  6. import time
  7. import secrets
  8. from typing import Dict, Optional, Tuple
  9. class SessionKeyService:
  10. """会话密钥服务类"""
  11. def __init__(self):
  12. # 存储会话密钥的内存字典,格式:{key_id: (key, expiry_time)}
  13. self.session_keys: Dict[str, Tuple[str, float]] = {}
  14. # 密钥有效期(秒)
  15. self.key_expiry_seconds = 180 # 3分钟
  16. def generate_session_key(self) -> Tuple[str, str]:
  17. """
  18. 生成会话密钥
  19. Returns:
  20. Tuple[str, str]: (key_id, session_key)
  21. """
  22. # 生成唯一的key_id
  23. key_id = secrets.token_urlsafe(16)
  24. # 生成32位的AES密钥
  25. session_key = secrets.token_hex(16) # 16字节 = 32字符
  26. # 计算过期时间
  27. expiry_time = time.time() + self.key_expiry_seconds
  28. # 存储密钥
  29. self.session_keys[key_id] = (session_key, expiry_time)
  30. # 清理过期密钥
  31. self._clean_expired_keys()
  32. return key_id, session_key
  33. def get_session_key(self, key_id: str) -> Optional[str]:
  34. """
  35. 获取会话密钥
  36. Args:
  37. key_id: 密钥ID
  38. Returns:
  39. Optional[str]: 会话密钥,如果不存在或已过期则返回None
  40. """
  41. # 清理过期密钥
  42. self._clean_expired_keys()
  43. # 获取密钥
  44. key_info = self.session_keys.get(key_id)
  45. if not key_info:
  46. return None
  47. session_key, expiry_time = key_info
  48. # 检查是否过期
  49. if time.time() > expiry_time:
  50. # 删除过期密钥
  51. del self.session_keys[key_id]
  52. return None
  53. # 确保返回的会话密钥是字符串类型
  54. return str(session_key)
  55. def remove_session_key(self, key_id: str) -> None:
  56. """
  57. 删除会话密钥
  58. Args:
  59. key_id: 密钥ID
  60. """
  61. if key_id in self.session_keys:
  62. del self.session_keys[key_id]
  63. def _clean_expired_keys(self) -> None:
  64. """
  65. 清理过期的会话密钥
  66. """
  67. current_time = time.time()
  68. expired_keys = [
  69. key_id for key_id, (_, expiry_time) in self.session_keys.items()
  70. if current_time > expiry_time
  71. ]
  72. for key_id in expired_keys:
  73. del self.session_keys[key_id]
  74. # 全局实例
  75. session_key_service = SessionKeyService()