crypto_utils.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. """
  2. 加密工具模块
  3. 用于本地模型API Key的AES加密存储和平台API Key的SHA256哈希存储
  4. """
  5. import os
  6. import base64
  7. import hashlib
  8. import secrets
  9. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  10. from cryptography.hazmat.backends import default_backend
  11. # 默认加密密钥(仅用于开发环境,生产环境必须设置ENCRYPTION_KEY环境变量)
  12. DEFAULT_ENCRYPTION_KEY = "wxcz-aigc-default-encryption-key"
  13. def _get_encryption_key() -> bytes:
  14. """
  15. 获取AES加密密钥
  16. 从环境变量ENCRYPTION_KEY读取,如果未设置则使用默认密钥
  17. 返回32字节的密钥(用于AES-256)
  18. """
  19. key_str = os.environ.get("ENCRYPTION_KEY", DEFAULT_ENCRYPTION_KEY)
  20. # 使用SHA256确保密钥长度为32字节
  21. return hashlib.sha256(key_str.encode()).digest()
  22. def encrypt_api_key(plain_text: str) -> str:
  23. """
  24. 使用AES-256-CBC模式加密API Key
  25. Args:
  26. plain_text: 需要加密的明文字符串
  27. Returns:
  28. Base64编码的密文字符串(格式:iv + 密文)
  29. """
  30. if not plain_text:
  31. return ""
  32. key = _get_encryption_key()
  33. # 生成随机16字节IV
  34. iv = os.urandom(16)
  35. # 创建AES-256-CBC加密器
  36. cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
  37. encryptor = cipher.encryptor()
  38. # PKCS7填充
  39. plain_bytes = plain_text.encode('utf-8')
  40. block_size = 16
  41. padding_length = block_size - (len(plain_bytes) % block_size)
  42. padded_data = plain_bytes + bytes([padding_length] * padding_length)
  43. # 加密
  44. cipher_text = encryptor.update(padded_data) + encryptor.finalize()
  45. # 将IV和密文拼接后进行Base64编码
  46. return base64.b64encode(iv + cipher_text).decode('utf-8')
  47. def decrypt_api_key(cipher_text: str) -> str:
  48. """
  49. 解密AES-256-CBC加密的API Key
  50. Args:
  51. cipher_text: Base64编码的密文字符串
  52. Returns:
  53. 解密后的明文字符串
  54. """
  55. if not cipher_text:
  56. return ""
  57. key = _get_encryption_key()
  58. # Base64解码
  59. encrypted_data = base64.b64decode(cipher_text)
  60. # 提取IV(前16字节)和密文
  61. iv = encrypted_data[:16]
  62. actual_cipher_text = encrypted_data[16:]
  63. # 创建AES-256-CBC解密器
  64. cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
  65. decryptor = cipher.decryptor()
  66. # 解密
  67. padded_data = decryptor.update(actual_cipher_text) + decryptor.finalize()
  68. # 移除PKCS7填充
  69. padding_length = padded_data[-1]
  70. plain_bytes = padded_data[:-padding_length]
  71. return plain_bytes.decode('utf-8')
  72. def get_effective_api_key(db, model_code: str, user_api_key: str) -> str:
  73. """
  74. 获取有效的 API Key。
  75. 优先使用模型自带的 encrypted_api_key(爬虫同步的),
  76. 没有或解密失败则 fallback 到用户自己配置的 apikey。
  77. Args:
  78. db: 数据库会话
  79. model_code: 模型标识(model_code)
  80. user_api_key: 用户自己配置的 apikey
  81. Returns:
  82. 有效的 API Key 字符串
  83. """
  84. try:
  85. from app.models.model import ModelNew
  86. model_obj = db.query(ModelNew).filter(
  87. ModelNew.model_code == model_code,
  88. ModelNew.is_local == False
  89. ).first()
  90. if model_obj and model_obj.encrypted_api_key:
  91. decrypted = decrypt_api_key(model_obj.encrypted_api_key)
  92. if decrypted:
  93. return decrypted
  94. except Exception:
  95. pass
  96. return user_api_key
  97. def hash_api_key(api_key: str) -> str:
  98. """
  99. 使用SHA256算法对API Key进行哈希
  100. 用于平台API Key的安全存储(单向哈希,不可逆)
  101. Args:
  102. api_key: 需要哈希的API Key明文
  103. Returns:
  104. SHA256哈希后的十六进制字符串
  105. """
  106. if not api_key:
  107. return ""
  108. return hashlib.sha256(api_key.encode('utf-8')).hexdigest()
  109. def verify_api_key_hash(api_key: str, hashed: str) -> bool:
  110. """
  111. 验证API Key是否与哈希值匹配
  112. Args:
  113. api_key: API Key明文
  114. hashed: 存储的SHA256哈希值
  115. Returns:
  116. True 如果匹配,False 如果不匹配
  117. """
  118. if not api_key or not hashed:
  119. return False
  120. return hash_api_key(api_key) == hashed
  121. # API Key前缀常量
  122. API_KEY_PREFIX = "sk-aigc-"
  123. # API Key总长度(包含前缀)
  124. API_KEY_TOTAL_LENGTH = 48
  125. def generate_platform_api_key() -> tuple[str, str]:
  126. """
  127. 生成平台API Key
  128. 生成以 "sk-aigc-" 为前缀、总长度48字符的安全随机密钥
  129. 使用 secrets 模块生成密码学安全的随机字符串
  130. Returns:
  131. tuple[str, str]: (完整密钥, 显示前缀)
  132. - 完整密钥格式:sk-aigc-{40字符随机字符串}
  133. - 显示前缀格式:sk-aigc-xxxx...xxxx(显示前4位和后4位随机部分)
  134. Example:
  135. >>> full_key, display_prefix = generate_platform_api_key()
  136. >>> full_key
  137. 'sk-aigc-a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0'
  138. >>> display_prefix
  139. 'sk-aigc-a1b2...s9t0'
  140. """
  141. # 计算随机部分的长度(总长度 - 前缀长度)
  142. random_length = API_KEY_TOTAL_LENGTH - len(API_KEY_PREFIX)
  143. # 使用 secrets.token_hex 生成安全随机字符串
  144. # token_hex 返回的是十六进制字符串,每字节产生2个字符
  145. # 所以需要 random_length // 2 字节
  146. random_part = secrets.token_hex(random_length // 2)
  147. # 确保随机部分长度正确(处理奇数长度情况)
  148. random_part = random_part[:random_length]
  149. # 组合完整密钥
  150. full_key = f"{API_KEY_PREFIX}{random_part}"
  151. # 生成显示前缀:sk-aigc-xxxx...xxxx(显示前4位和后4位随机部分)
  152. display_prefix = f"{API_KEY_PREFIX}{random_part[:4]}...{random_part[-4:]}"
  153. return full_key, display_prefix