rsa_util.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. # coding=utf-8
  2. """
  3. @project: maxkb
  4. @Author:虎
  5. @file: rsa_util.py
  6. @date:2023/11/3 11:13
  7. @desc:
  8. """
  9. import base64
  10. import threading
  11. from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
  12. from Crypto.PublicKey import RSA
  13. from django.core import cache
  14. from django.db.models import QuerySet
  15. from common.constants.cache_version import Cache_Version
  16. from local_model.models.system_setting import SystemSetting, SettingType
  17. lock = threading.Lock()
  18. rsa_cache = cache.cache
  19. cache_key = "rsa_key"
  20. # 对密钥加密的密码
  21. secret_code = "mac_kb_password"
  22. def generate():
  23. """
  24. 生成 私钥秘钥对
  25. :return:{key:'公钥',value:'私钥'}
  26. """
  27. # 生成一个 2048 位的密钥
  28. key = RSA.generate(2048)
  29. # 获取私钥
  30. encrypted_key = key.export_key(passphrase=secret_code, pkcs=8,
  31. protection="scryptAndAES128-CBC")
  32. return {'key': key.publickey().export_key(), 'value': encrypted_key}
  33. def get_key_pair():
  34. rsa_value = rsa_cache.get(cache_key)
  35. if rsa_value is None:
  36. with lock:
  37. rsa_value = rsa_cache.get(cache_key)
  38. if rsa_value is not None:
  39. return rsa_value
  40. rsa_value = get_key_pair_by_sql()
  41. version, get_key = Cache_Version.SYSTEM.value
  42. rsa_cache.set(get_key(key='rsa_key'), rsa_value, timeout=None, version=version)
  43. return rsa_value
  44. def get_key_pair_by_sql():
  45. system_setting = QuerySet(SystemSetting).filter(type=SettingType.RSA.value).first()
  46. if system_setting is None:
  47. kv = generate()
  48. system_setting = SystemSetting(type=SettingType.RSA.value,
  49. meta={'key': kv.get('key').decode(), 'value': kv.get('value').decode()})
  50. system_setting.save()
  51. return system_setting.meta
  52. def encrypt(msg, public_key: str | None = None):
  53. """
  54. 加密
  55. :param msg: 加密数据
  56. :param public_key: 公钥
  57. :return: 加密后的数据
  58. """
  59. if public_key is None:
  60. public_key = get_key_pair().get('key')
  61. cipher = PKCS1_cipher.new(RSA.importKey(public_key))
  62. encrypt_msg = cipher.encrypt(msg.encode("utf-8"))
  63. return base64.b64encode(encrypt_msg).decode()
  64. def decrypt(msg, pri_key: str | None = None):
  65. """
  66. 解密
  67. :param msg: 需要解密的数据
  68. :param pri_key: 私钥
  69. :return: 解密后数据
  70. """
  71. if pri_key is None:
  72. pri_key = get_key_pair().get('value')
  73. cipher = PKCS1_cipher.new(RSA.importKey(pri_key, passphrase=secret_code))
  74. decrypt_data = cipher.decrypt(base64.b64decode(msg), 0)
  75. return decrypt_data.decode("utf-8")
  76. def rsa_long_encrypt(message, public_key: str | None = None, length=200):
  77. """
  78. 超长文本加密
  79. :param message: 需要加密的字符串
  80. :param public_key 公钥
  81. :param length: 1024bit的证书用100, 2048bit的证书用 200
  82. :return: 加密后的数据
  83. """
  84. # 读取公钥
  85. if public_key is None:
  86. public_key = get_key_pair().get('key')
  87. cipher = PKCS1_cipher.new(RSA.importKey(extern_key=public_key,
  88. passphrase=secret_code))
  89. # 处理:Plaintext is too long. 分段加密
  90. if len(message) <= length:
  91. # 对编码的数据进行加密,并通过base64进行编码
  92. result = base64.b64encode(cipher.encrypt(message.encode('utf-8')))
  93. else:
  94. rsa_text = []
  95. # 对编码后的数据进行切片,原因:加密长度不能过长
  96. for i in range(0, len(message), length):
  97. cont = message[i:i + length]
  98. # 对切片后的数据进行加密,并新增到text后面
  99. rsa_text.append(cipher.encrypt(cont.encode('utf-8')))
  100. # 加密完进行拼接
  101. cipher_text = b''.join(rsa_text)
  102. # base64进行编码
  103. result = base64.b64encode(cipher_text)
  104. return result.decode()
  105. def rsa_long_decrypt(message, pri_key: str | None = None, length=256):
  106. """
  107. 超长文本解密,默认不加密
  108. :param message: 需要解密的数据
  109. :param pri_key: 秘钥
  110. :param length : 1024bit的证书用128,2048bit证书用256位
  111. :return: 解密后的数据
  112. """
  113. if pri_key is None:
  114. pri_key = get_key_pair().get('value')
  115. cipher = PKCS1_cipher.new(RSA.importKey(pri_key, passphrase=secret_code))
  116. base64_de = base64.b64decode(message)
  117. res = []
  118. for i in range(0, len(base64_de), length):
  119. res.append(cipher.decrypt(base64_de[i:i + length], 0))
  120. return b"".join(res).decode()