| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- """
- 加密工具模块
- 用于本地模型API Key的AES加密存储和平台API Key的SHA256哈希存储
- """
- import os
- import base64
- import hashlib
- import secrets
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- from cryptography.hazmat.backends import default_backend
- # 默认加密密钥(仅用于开发环境,生产环境必须设置ENCRYPTION_KEY环境变量)
- DEFAULT_ENCRYPTION_KEY = "wxcz-aigc-default-encryption-key"
- def _get_encryption_key() -> bytes:
- """
- 获取AES加密密钥
- 从环境变量ENCRYPTION_KEY读取,如果未设置则使用默认密钥
- 返回32字节的密钥(用于AES-256)
- """
- key_str = os.environ.get("ENCRYPTION_KEY", DEFAULT_ENCRYPTION_KEY)
- # 使用SHA256确保密钥长度为32字节
- return hashlib.sha256(key_str.encode()).digest()
- def encrypt_api_key(plain_text: str) -> str:
- """
- 使用AES-256-CBC模式加密API Key
-
- Args:
- plain_text: 需要加密的明文字符串
-
- Returns:
- Base64编码的密文字符串(格式:iv + 密文)
- """
- if not plain_text:
- return ""
-
- key = _get_encryption_key()
- # 生成随机16字节IV
- iv = os.urandom(16)
-
- # 创建AES-256-CBC加密器
- cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
- encryptor = cipher.encryptor()
-
- # PKCS7填充
- plain_bytes = plain_text.encode('utf-8')
- block_size = 16
- padding_length = block_size - (len(plain_bytes) % block_size)
- padded_data = plain_bytes + bytes([padding_length] * padding_length)
-
- # 加密
- cipher_text = encryptor.update(padded_data) + encryptor.finalize()
-
- # 将IV和密文拼接后进行Base64编码
- return base64.b64encode(iv + cipher_text).decode('utf-8')
- def decrypt_api_key(cipher_text: str) -> str:
- """
- 解密AES-256-CBC加密的API Key
-
- Args:
- cipher_text: Base64编码的密文字符串
-
- Returns:
- 解密后的明文字符串
- """
- if not cipher_text:
- return ""
-
- key = _get_encryption_key()
-
- # Base64解码
- encrypted_data = base64.b64decode(cipher_text)
-
- # 提取IV(前16字节)和密文
- iv = encrypted_data[:16]
- actual_cipher_text = encrypted_data[16:]
-
- # 创建AES-256-CBC解密器
- cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
- decryptor = cipher.decryptor()
-
- # 解密
- padded_data = decryptor.update(actual_cipher_text) + decryptor.finalize()
-
- # 移除PKCS7填充
- padding_length = padded_data[-1]
- plain_bytes = padded_data[:-padding_length]
-
- return plain_bytes.decode('utf-8')
- def get_effective_api_key(db, model_code: str, user_api_key: str) -> str:
- """
- 获取有效的 API Key。
- 优先使用模型自带的 encrypted_api_key(爬虫同步的),
- 没有或解密失败则 fallback 到用户自己配置的 apikey。
- Args:
- db: 数据库会话
- model_code: 模型标识(model_code)
- user_api_key: 用户自己配置的 apikey
- Returns:
- 有效的 API Key 字符串
- """
- try:
- from app.models.model import ModelNew
- model_obj = db.query(ModelNew).filter(
- ModelNew.model_code == model_code,
- ModelNew.is_local == False
- ).first()
- if model_obj and model_obj.encrypted_api_key:
- decrypted = decrypt_api_key(model_obj.encrypted_api_key)
- if decrypted:
- return decrypted
- except Exception:
- pass
- return user_api_key
- def hash_api_key(api_key: str) -> str:
- """
- 使用SHA256算法对API Key进行哈希
- 用于平台API Key的安全存储(单向哈希,不可逆)
-
- Args:
- api_key: 需要哈希的API Key明文
-
- Returns:
- SHA256哈希后的十六进制字符串
- """
- if not api_key:
- return ""
- return hashlib.sha256(api_key.encode('utf-8')).hexdigest()
- def verify_api_key_hash(api_key: str, hashed: str) -> bool:
- """
- 验证API Key是否与哈希值匹配
-
- Args:
- api_key: API Key明文
- hashed: 存储的SHA256哈希值
-
- Returns:
- True 如果匹配,False 如果不匹配
- """
- if not api_key or not hashed:
- return False
- return hash_api_key(api_key) == hashed
- # API Key前缀常量
- API_KEY_PREFIX = "sk-aigc-"
- # API Key总长度(包含前缀)
- API_KEY_TOTAL_LENGTH = 48
- def generate_platform_api_key() -> tuple[str, str]:
- """
- 生成平台API Key
-
- 生成以 "sk-aigc-" 为前缀、总长度48字符的安全随机密钥
- 使用 secrets 模块生成密码学安全的随机字符串
-
- Returns:
- tuple[str, str]: (完整密钥, 显示前缀)
- - 完整密钥格式:sk-aigc-{40字符随机字符串}
- - 显示前缀格式:sk-aigc-xxxx...xxxx(显示前4位和后4位随机部分)
-
- Example:
- >>> full_key, display_prefix = generate_platform_api_key()
- >>> full_key
- 'sk-aigc-a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0'
- >>> display_prefix
- 'sk-aigc-a1b2...s9t0'
- """
- # 计算随机部分的长度(总长度 - 前缀长度)
- random_length = API_KEY_TOTAL_LENGTH - len(API_KEY_PREFIX)
-
- # 使用 secrets.token_hex 生成安全随机字符串
- # token_hex 返回的是十六进制字符串,每字节产生2个字符
- # 所以需要 random_length // 2 字节
- random_part = secrets.token_hex(random_length // 2)
-
- # 确保随机部分长度正确(处理奇数长度情况)
- random_part = random_part[:random_length]
-
- # 组合完整密钥
- full_key = f"{API_KEY_PREFIX}{random_part}"
-
- # 生成显示前缀:sk-aigc-xxxx...xxxx(显示前4位和后4位随机部分)
- display_prefix = f"{API_KEY_PREFIX}{random_part[:4]}...{random_part[-4:]}"
-
- return full_key, display_prefix
|