| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- """
- 加密服务
- 提供RSA非对称加密和AES对称加密功能
- 用于保护敏感数据(如身份证号)
- """
- import base64
- import os
- from cryptography.hazmat.primitives import hashes, serialization
- from cryptography.hazmat.primitives.asymmetric import rsa, padding
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- from cryptography.hazmat.backends import default_backend
- from typing import Tuple
- class EncryptionService:
- """加密服务类"""
-
- def __init__(self):
- # 从环境变量获取AES密钥,如果没有则生成一个
- aes_key_b64 = os.getenv("AES_ENCRYPTION_KEY")
- if aes_key_b64:
- self.aes_key = base64.b64decode(aes_key_b64)
- else:
- # 生成32字节(256位)的AES密钥
- self.aes_key = os.urandom(32)
- print(f"警告: 未设置AES_ENCRYPTION_KEY环境变量,使用临时密钥: {base64.b64encode(self.aes_key).decode()}")
-
- # RSA密钥对(用于前后端通信)
- self._load_or_generate_rsa_keys()
-
- def _load_or_generate_rsa_keys(self):
- """加载或生成RSA密钥对"""
- private_key_path = "rsa_private_key.pem"
- public_key_path = "rsa_public_key.pem"
-
- try:
- # 尝试加载现有密钥
- with open(private_key_path, "rb") as f:
- self.private_key = serialization.load_pem_private_key(
- f.read(),
- password=None,
- backend=default_backend()
- )
- with open(public_key_path, "rb") as f:
- self.public_key = serialization.load_pem_public_key(
- f.read(),
- backend=default_backend()
- )
- except FileNotFoundError:
- # 生成新的RSA密钥对(2048位)
- self.private_key = rsa.generate_private_key(
- public_exponent=65537,
- key_size=2048,
- backend=default_backend()
- )
- self.public_key = self.private_key.public_key()
-
- # 保存密钥到文件
- with open(private_key_path, "wb") as f:
- f.write(self.private_key.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.PKCS8,
- encryption_algorithm=serialization.NoEncryption()
- ))
-
- with open(public_key_path, "wb") as f:
- f.write(self.public_key.public_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PublicFormat.SubjectPublicKeyInfo
- ))
-
- def get_public_key_pem(self) -> str:
- """获取公钥的PEM格式字符串(用于前端)"""
- pem = self.public_key.public_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PublicFormat.SubjectPublicKeyInfo
- )
- return pem.decode('utf-8')
-
- def rsa_decrypt(self, encrypted_data: str) -> str:
- """
- 使用RSA私钥解密数据(兼容jsencrypt的PKCS1填充)
-
- Args:
- encrypted_data: Base64编码的加密数据
-
- Returns:
- 解密后的明文字符串
- """
- try:
- encrypted_bytes = base64.b64decode(encrypted_data)
- # 使用PKCS1填充(兼容jsencrypt)
- decrypted_bytes = self.private_key.decrypt(
- encrypted_bytes,
- padding.PKCS1v15()
- )
- return decrypted_bytes.decode('utf-8')
- except Exception as e:
- raise ValueError(f"RSA解密失败: {str(e)}")
-
- def aes_encrypt(self, plaintext: str) -> str:
- """
- 使用AES加密数据(用于数据库存储)
-
- Args:
- plaintext: 明文字符串
-
- Returns:
- Base64编码的加密数据(包含IV)
- """
- # 生成随机IV(初始化向量)
- iv = os.urandom(16)
-
- # 创建AES加密器(CBC模式)
- cipher = Cipher(
- algorithms.AES(self.aes_key),
- modes.CBC(iv),
- backend=default_backend()
- )
- encryptor = cipher.encryptor()
-
- # PKCS7填充
- plaintext_bytes = plaintext.encode('utf-8')
- padding_length = 16 - (len(plaintext_bytes) % 16)
- padded_plaintext = plaintext_bytes + bytes([padding_length] * padding_length)
-
- # 加密
- ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
-
- # 将IV和密文组合后Base64编码
- combined = iv + ciphertext
- return base64.b64encode(combined).decode('utf-8')
-
- def aes_decrypt(self, encrypted_data: str) -> str:
- """
- 使用AES解密数据
-
- Args:
- encrypted_data: Base64编码的加密数据(包含IV)
-
- Returns:
- 解密后的明文字符串
- """
- try:
- # Base64解码
- combined = base64.b64decode(encrypted_data)
-
- # 分离IV和密文
- iv = combined[:16]
- ciphertext = combined[16:]
-
- # 创建AES解密器
- cipher = Cipher(
- algorithms.AES(self.aes_key),
- modes.CBC(iv),
- backend=default_backend()
- )
- decryptor = cipher.decryptor()
-
- # 解密
- padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
-
- # 去除PKCS7填充
- padding_length = padded_plaintext[-1]
- plaintext = padded_plaintext[:-padding_length]
-
- return plaintext.decode('utf-8')
- except Exception as e:
- raise ValueError(f"AES解密失败: {str(e)}")
- # 全局单例
- encryption_service = EncryptionService()
|