""" 加密工具模块 用于本地模型API Key的AES加密存储和平台API Key的SHA256哈希存储 """ import os import base64 import hashlib import secrets from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend # 默认加密密钥(仅用于开发环境,生产环境必须设置ENCRYPTION_KEY环境变量) DEFAULT_ENCRYPTION_KEY = "wxcz-aigc-default-encryption-key" def _get_encryption_key() -> bytes: """ 获取AES加密密钥 从环境变量ENCRYPTION_KEY读取,如果未设置则使用默认密钥 返回32字节的密钥(用于AES-256) """ key_str = os.environ.get("ENCRYPTION_KEY", DEFAULT_ENCRYPTION_KEY) # 使用SHA256确保密钥长度为32字节 return hashlib.sha256(key_str.encode()).digest() def encrypt_api_key(plain_text: str) -> str: """ 使用AES-256-CBC模式加密API Key Args: plain_text: 需要加密的明文字符串 Returns: Base64编码的密文字符串(格式:iv + 密文) """ if not plain_text: return "" key = _get_encryption_key() # 生成随机16字节IV iv = os.urandom(16) # 创建AES-256-CBC加密器 cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) encryptor = cipher.encryptor() # PKCS7填充 plain_bytes = plain_text.encode('utf-8') block_size = 16 padding_length = block_size - (len(plain_bytes) % block_size) padded_data = plain_bytes + bytes([padding_length] * padding_length) # 加密 cipher_text = encryptor.update(padded_data) + encryptor.finalize() # 将IV和密文拼接后进行Base64编码 return base64.b64encode(iv + cipher_text).decode('utf-8') def decrypt_api_key(cipher_text: str) -> str: """ 解密AES-256-CBC加密的API Key Args: cipher_text: Base64编码的密文字符串 Returns: 解密后的明文字符串 """ if not cipher_text: return "" key = _get_encryption_key() # Base64解码 encrypted_data = base64.b64decode(cipher_text) # 提取IV(前16字节)和密文 iv = encrypted_data[:16] actual_cipher_text = encrypted_data[16:] # 创建AES-256-CBC解密器 cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) decryptor = cipher.decryptor() # 解密 padded_data = decryptor.update(actual_cipher_text) + decryptor.finalize() # 移除PKCS7填充 padding_length = padded_data[-1] plain_bytes = padded_data[:-padding_length] return plain_bytes.decode('utf-8') def get_effective_api_key(db, model_code: str, user_api_key: str) -> str: """ 获取有效的 API Key。 优先使用模型自带的 encrypted_api_key(爬虫同步的), 没有或解密失败则 fallback 到用户自己配置的 apikey。 Args: db: 数据库会话 model_code: 模型标识(model_code) user_api_key: 用户自己配置的 apikey Returns: 有效的 API Key 字符串 """ try: from app.models.model import ModelNew model_obj = db.query(ModelNew).filter( ModelNew.model_code == model_code, ModelNew.is_local == False ).first() if model_obj and model_obj.encrypted_api_key: decrypted = decrypt_api_key(model_obj.encrypted_api_key) if decrypted: return decrypted except Exception: pass return user_api_key def hash_api_key(api_key: str) -> str: """ 使用SHA256算法对API Key进行哈希 用于平台API Key的安全存储(单向哈希,不可逆) Args: api_key: 需要哈希的API Key明文 Returns: SHA256哈希后的十六进制字符串 """ if not api_key: return "" return hashlib.sha256(api_key.encode('utf-8')).hexdigest() def verify_api_key_hash(api_key: str, hashed: str) -> bool: """ 验证API Key是否与哈希值匹配 Args: api_key: API Key明文 hashed: 存储的SHA256哈希值 Returns: True 如果匹配,False 如果不匹配 """ if not api_key or not hashed: return False return hash_api_key(api_key) == hashed # API Key前缀常量 API_KEY_PREFIX = "sk-aigc-" # API Key总长度(包含前缀) API_KEY_TOTAL_LENGTH = 48 def generate_platform_api_key() -> tuple[str, str]: """ 生成平台API Key 生成以 "sk-aigc-" 为前缀、总长度48字符的安全随机密钥 使用 secrets 模块生成密码学安全的随机字符串 Returns: tuple[str, str]: (完整密钥, 显示前缀) - 完整密钥格式:sk-aigc-{40字符随机字符串} - 显示前缀格式:sk-aigc-xxxx...xxxx(显示前4位和后4位随机部分) Example: >>> full_key, display_prefix = generate_platform_api_key() >>> full_key 'sk-aigc-a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0' >>> display_prefix 'sk-aigc-a1b2...s9t0' """ # 计算随机部分的长度(总长度 - 前缀长度) random_length = API_KEY_TOTAL_LENGTH - len(API_KEY_PREFIX) # 使用 secrets.token_hex 生成安全随机字符串 # token_hex 返回的是十六进制字符串,每字节产生2个字符 # 所以需要 random_length // 2 字节 random_part = secrets.token_hex(random_length // 2) # 确保随机部分长度正确(处理奇数长度情况) random_part = random_part[:random_length] # 组合完整密钥 full_key = f"{API_KEY_PREFIX}{random_part}" # 生成显示前缀:sk-aigc-xxxx...xxxx(显示前4位和后4位随机部分) display_prefix = f"{API_KEY_PREFIX}{random_part[:4]}...{random_part[-4:]}" return full_key, display_prefix