from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend import base64 import os from .config import settings def get_encrypt_key() -> bytes: """获取加密密钥""" key = settings.oss.parse_encrypt_key return key.encode('utf-8')[:16].ljust(16, b'\0') def encrypt_url(plain_url: str) -> str: """加密URL - 使用CFB模式与Go版本一致""" if not plain_url: return "" try: key = get_encrypt_key() plain_bytes = plain_url.encode('utf-8') # 生成随机IV iv = os.urandom(16) # 使用CFB模式 cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend()) encryptor = cipher.encryptor() # 加密 encrypted = encryptor.update(plain_bytes) + encryptor.finalize() # IV + 密文 ciphertext = iv + encrypted return base64.urlsafe_b64encode(ciphertext).decode('utf-8') except Exception as e: print(f"加密失败: {e}") return "" def decrypt_url(encrypted_url: str) -> str: """解密URL - 使用CFB模式与Go版本一致""" if not encrypted_url: return "" try: key = get_encrypt_key() # Base64解码 ciphertext = base64.urlsafe_b64decode(encrypted_url) if len(ciphertext) < 16: raise ValueError("密文长度不足") # 提取IV和密文 iv = ciphertext[:16] encrypted = ciphertext[16:] # 使用CFB模式解密 cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend()) decryptor = cipher.decryptor() decrypted = decryptor.update(encrypted) + decryptor.finalize() return decrypted.decode('utf-8') except Exception as e: print(f"解密失败: {e}") return ""