encryption_service.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. """
  2. 加密服务
  3. 提供RSA非对称加密和AES对称加密功能
  4. 用于保护敏感数据(如身份证号)
  5. """
  6. import base64
  7. import os
  8. from cryptography.hazmat.primitives import hashes, serialization
  9. from cryptography.hazmat.primitives.asymmetric import rsa, padding
  10. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  11. from cryptography.hazmat.backends import default_backend
  12. from typing import Tuple
  13. class EncryptionService:
  14. """加密服务类"""
  15. def __init__(self):
  16. # 从环境变量获取AES密钥,如果没有则生成一个
  17. aes_key_b64 = os.getenv("AES_ENCRYPTION_KEY")
  18. if aes_key_b64:
  19. self.aes_key = base64.b64decode(aes_key_b64)
  20. else:
  21. # 生成32字节(256位)的AES密钥
  22. self.aes_key = os.urandom(32)
  23. print(f"警告: 未设置AES_ENCRYPTION_KEY环境变量,使用临时密钥: {base64.b64encode(self.aes_key).decode()}")
  24. # RSA密钥对(用于前后端通信)
  25. self._load_or_generate_rsa_keys()
  26. def _load_or_generate_rsa_keys(self):
  27. """加载或生成RSA密钥对"""
  28. private_key_path = "rsa_private_key.pem"
  29. public_key_path = "rsa_public_key.pem"
  30. try:
  31. # 尝试加载现有密钥
  32. with open(private_key_path, "rb") as f:
  33. self.private_key = serialization.load_pem_private_key(
  34. f.read(),
  35. password=None,
  36. backend=default_backend()
  37. )
  38. with open(public_key_path, "rb") as f:
  39. self.public_key = serialization.load_pem_public_key(
  40. f.read(),
  41. backend=default_backend()
  42. )
  43. except FileNotFoundError:
  44. # 生成新的RSA密钥对(2048位)
  45. self.private_key = rsa.generate_private_key(
  46. public_exponent=65537,
  47. key_size=2048,
  48. backend=default_backend()
  49. )
  50. self.public_key = self.private_key.public_key()
  51. # 保存密钥到文件
  52. with open(private_key_path, "wb") as f:
  53. f.write(self.private_key.private_bytes(
  54. encoding=serialization.Encoding.PEM,
  55. format=serialization.PrivateFormat.PKCS8,
  56. encryption_algorithm=serialization.NoEncryption()
  57. ))
  58. with open(public_key_path, "wb") as f:
  59. f.write(self.public_key.public_bytes(
  60. encoding=serialization.Encoding.PEM,
  61. format=serialization.PublicFormat.SubjectPublicKeyInfo
  62. ))
  63. def get_public_key_pem(self) -> str:
  64. """获取公钥的PEM格式字符串(用于前端)"""
  65. pem = self.public_key.public_bytes(
  66. encoding=serialization.Encoding.PEM,
  67. format=serialization.PublicFormat.SubjectPublicKeyInfo
  68. )
  69. return pem.decode('utf-8')
  70. def rsa_decrypt(self, encrypted_data: str) -> str:
  71. """
  72. 使用RSA私钥解密数据(兼容jsencrypt的PKCS1填充)
  73. Args:
  74. encrypted_data: Base64编码的加密数据
  75. Returns:
  76. 解密后的明文字符串
  77. """
  78. try:
  79. encrypted_bytes = base64.b64decode(encrypted_data)
  80. # 使用PKCS1填充(兼容jsencrypt)
  81. decrypted_bytes = self.private_key.decrypt(
  82. encrypted_bytes,
  83. padding.PKCS1v15()
  84. )
  85. return decrypted_bytes.decode('utf-8')
  86. except Exception as e:
  87. raise ValueError(f"RSA解密失败: {str(e)}")
  88. def aes_encrypt(self, plaintext: str) -> str:
  89. """
  90. 使用AES加密数据(用于数据库存储)
  91. Args:
  92. plaintext: 明文字符串
  93. Returns:
  94. Base64编码的加密数据(包含IV)
  95. """
  96. # 生成随机IV(初始化向量)
  97. iv = os.urandom(16)
  98. # 创建AES加密器(CBC模式)
  99. cipher = Cipher(
  100. algorithms.AES(self.aes_key),
  101. modes.CBC(iv),
  102. backend=default_backend()
  103. )
  104. encryptor = cipher.encryptor()
  105. # PKCS7填充
  106. plaintext_bytes = plaintext.encode('utf-8')
  107. padding_length = 16 - (len(plaintext_bytes) % 16)
  108. padded_plaintext = plaintext_bytes + bytes([padding_length] * padding_length)
  109. # 加密
  110. ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
  111. # 将IV和密文组合后Base64编码
  112. combined = iv + ciphertext
  113. return base64.b64encode(combined).decode('utf-8')
  114. def aes_decrypt(self, encrypted_data: str) -> str:
  115. """
  116. 使用AES解密数据
  117. Args:
  118. encrypted_data: Base64编码的加密数据(包含IV)
  119. Returns:
  120. 解密后的明文字符串
  121. """
  122. try:
  123. # Base64解码
  124. combined = base64.b64decode(encrypted_data)
  125. # 分离IV和密文
  126. iv = combined[:16]
  127. ciphertext = combined[16:]
  128. # 创建AES解密器
  129. cipher = Cipher(
  130. algorithms.AES(self.aes_key),
  131. modes.CBC(iv),
  132. backend=default_backend()
  133. )
  134. decryptor = cipher.decryptor()
  135. # 解密
  136. padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
  137. # 去除PKCS7填充
  138. padding_length = padded_plaintext[-1]
  139. plaintext = padded_plaintext[:-padding_length]
  140. return plaintext.decode('utf-8')
  141. except Exception as e:
  142. raise ValueError(f"AES解密失败: {str(e)}")
  143. # 全局单例
  144. encryption_service = EncryptionService()