rsa_util.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # coding=utf-8
  2. """
  3. @project: maxkb
  4. @Author:虎
  5. @file: rsa_util.py
  6. @date:2023/11/3 11:13
  7. @desc:
  8. """
  9. import base64
  10. import threading
  11. from functools import lru_cache
  12. from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
  13. from Crypto.PublicKey import RSA
  14. from django.core import cache
  15. from django.db.models import QuerySet
  16. from common.constants.cache_version import Cache_Version
  17. from system_manage.models import SystemSetting, SettingType
  18. lock = threading.Lock()
  19. rsa_cache = cache.cache
  20. cache_key = "rsa_key"
  21. # 对密钥加密的密码
  22. secret_code = "mac_kb_password"
  23. def generate():
  24. """
  25. 生成 私钥秘钥对
  26. :return:{key:'公钥',value:'私钥'}
  27. """
  28. # 生成一个 2048 位的密钥
  29. key = RSA.generate(2048)
  30. # 获取私钥
  31. encrypted_key = key.export_key(passphrase=secret_code, pkcs=8,
  32. protection="scryptAndAES128-CBC")
  33. return {'key': key.publickey().export_key(), 'value': encrypted_key}
  34. def get_key_pair():
  35. rsa_value = rsa_cache.get(cache_key)
  36. if rsa_value is None:
  37. with lock:
  38. rsa_value = rsa_cache.get(cache_key)
  39. if rsa_value is not None:
  40. return rsa_value
  41. rsa_value = get_key_pair_by_sql()
  42. version, get_key = Cache_Version.SYSTEM.value
  43. rsa_cache.set(get_key(key='rsa_key'), rsa_value, timeout=None, version=version)
  44. return rsa_value
  45. def get_key_pair_by_sql():
  46. system_setting = QuerySet(SystemSetting).filter(type=SettingType.RSA.value).first()
  47. if system_setting is None:
  48. kv = generate()
  49. system_setting = SystemSetting(type=SettingType.RSA.value,
  50. meta={'key': kv.get('key').decode(), 'value': kv.get('value').decode()})
  51. system_setting.save()
  52. return system_setting.meta
  53. def encrypt(msg, public_key: str | None = None):
  54. """
  55. 加密
  56. :param msg: 加密数据
  57. :param public_key: 公钥
  58. :return: 加密后的数据
  59. """
  60. if public_key is None:
  61. public_key = get_key_pair().get('key')
  62. cipher = _get_encrypt_cipher(public_key)
  63. encrypt_msg = cipher.encrypt(msg.encode("utf-8"))
  64. return base64.b64encode(encrypt_msg).decode()
  65. def decrypt(msg, pri_key: str | None = None):
  66. """
  67. 解密
  68. :param msg: 需要解密的数据
  69. :param pri_key: 私钥
  70. :return: 解密后数据
  71. """
  72. if pri_key is None:
  73. pri_key = get_key_pair().get('value')
  74. cipher = _get_cipher(pri_key)
  75. decrypt_data = cipher.decrypt(base64.b64decode(msg), 0)
  76. return decrypt_data.decode("utf-8")
  77. @lru_cache(maxsize=2)
  78. def _get_encrypt_cipher(public_key: str):
  79. """缓存加密 cipher 对象"""
  80. return PKCS1_cipher.new(RSA.importKey(extern_key=public_key, passphrase=secret_code))
  81. def rsa_long_encrypt(message, public_key: str | None = None, length=200):
  82. """
  83. 超长文本加密
  84. :param message: 需要加密的字符串
  85. :param public_key 公钥
  86. :param length: 1024bit的证书用100, 2048bit的证书用 200
  87. :return: 加密后的数据
  88. """
  89. if public_key is None:
  90. public_key = get_key_pair().get('key')
  91. cipher = _get_encrypt_cipher(public_key)
  92. if len(message) <= length:
  93. result = base64.b64encode(cipher.encrypt(message.encode('utf-8')))
  94. else:
  95. rsa_text = []
  96. for i in range(0, len(message), length):
  97. cont = message[i:i + length]
  98. rsa_text.append(cipher.encrypt(cont.encode('utf-8')))
  99. cipher_text = b''.join(rsa_text)
  100. result = base64.b64encode(cipher_text)
  101. return result.decode()
  102. @lru_cache(maxsize=2)
  103. def _get_cipher(pri_key: str):
  104. """缓存 cipher 对象,避免重复创建"""
  105. return PKCS1_cipher.new(RSA.importKey(pri_key, passphrase=secret_code))
  106. def rsa_long_decrypt(message, pri_key: str | None = None, length=256):
  107. """
  108. 超长文本解密,优化内存使用
  109. :param message: 需要解密的数据
  110. :param pri_key: 秘钥
  111. :param length : 1024bit的证书用128,2048bit证书用256位
  112. :return: 解密后的数据
  113. """
  114. if pri_key is None:
  115. pri_key = get_key_pair().get('value')
  116. cipher = _get_cipher(pri_key)
  117. base64_de = base64.b64decode(message)
  118. # 使用 bytearray 减少内存分配
  119. result = bytearray()
  120. for i in range(0, len(base64_de), length):
  121. result.extend(cipher.decrypt(base64_de[i:i + length], 0))
  122. return result.decode()