""" 加密服务 提供RSA非对称加密和AES对称加密功能 用于保护敏感数据(如身份证号) """ import base64 import os from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from typing import Tuple class EncryptionService: """加密服务类""" def __init__(self): # 从环境变量获取AES密钥,如果没有则生成一个 aes_key_b64 = os.getenv("AES_ENCRYPTION_KEY") if aes_key_b64: self.aes_key = base64.b64decode(aes_key_b64) else: # 生成32字节(256位)的AES密钥 self.aes_key = os.urandom(32) print(f"警告: 未设置AES_ENCRYPTION_KEY环境变量,使用临时密钥: {base64.b64encode(self.aes_key).decode()}") # RSA密钥对(用于前后端通信) self._load_or_generate_rsa_keys() def _load_or_generate_rsa_keys(self): """加载或生成RSA密钥对""" private_key_path = "rsa_private_key.pem" public_key_path = "rsa_public_key.pem" try: # 尝试加载现有密钥 with open(private_key_path, "rb") as f: self.private_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() ) with open(public_key_path, "rb") as f: self.public_key = serialization.load_pem_public_key( f.read(), backend=default_backend() ) except FileNotFoundError: # 生成新的RSA密钥对(2048位) self.private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) self.public_key = self.private_key.public_key() # 保存密钥到文件 with open(private_key_path, "wb") as f: f.write(self.private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) with open(public_key_path, "wb") as f: f.write(self.public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo )) def get_public_key_pem(self) -> str: """获取公钥的PEM格式字符串(用于前端)""" pem = self.public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) return pem.decode('utf-8') def rsa_decrypt(self, encrypted_data: str) -> str: """ 使用RSA私钥解密数据(兼容jsencrypt的PKCS1填充) Args: encrypted_data: Base64编码的加密数据 Returns: 解密后的明文字符串 """ try: encrypted_bytes = base64.b64decode(encrypted_data) # 使用PKCS1填充(兼容jsencrypt) decrypted_bytes = self.private_key.decrypt( encrypted_bytes, padding.PKCS1v15() ) return decrypted_bytes.decode('utf-8') except Exception as e: raise ValueError(f"RSA解密失败: {str(e)}") def aes_encrypt(self, plaintext: str) -> str: """ 使用AES加密数据(用于数据库存储) Args: plaintext: 明文字符串 Returns: Base64编码的加密数据(包含IV) """ # 生成随机IV(初始化向量) iv = os.urandom(16) # 创建AES加密器(CBC模式) cipher = Cipher( algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend() ) encryptor = cipher.encryptor() # PKCS7填充 plaintext_bytes = plaintext.encode('utf-8') padding_length = 16 - (len(plaintext_bytes) % 16) padded_plaintext = plaintext_bytes + bytes([padding_length] * padding_length) # 加密 ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize() # 将IV和密文组合后Base64编码 combined = iv + ciphertext return base64.b64encode(combined).decode('utf-8') def aes_decrypt(self, encrypted_data: str) -> str: """ 使用AES解密数据 Args: encrypted_data: Base64编码的加密数据(包含IV) Returns: 解密后的明文字符串 """ try: # Base64解码 combined = base64.b64decode(encrypted_data) # 分离IV和密文 iv = combined[:16] ciphertext = combined[16:] # 创建AES解密器 cipher = Cipher( algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend() ) decryptor = cipher.decryptor() # 解密 padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize() # 去除PKCS7填充 padding_length = padded_plaintext[-1] plaintext = padded_plaintext[:-padding_length] return plaintext.decode('utf-8') except Exception as e: raise ValueError(f"AES解密失败: {str(e)}") # 全局单例 encryption_service = EncryptionService()