authenticate.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # coding=utf-8
  2. """
  3. @project: qabot
  4. @Author:虎虎
  5. @file: authenticate.py
  6. @date:2023/9/4 11:16
  7. @desc: 认证类
  8. """
  9. from importlib import import_module
  10. from django.conf import settings
  11. from django.core import cache
  12. from django.core import signing
  13. from django.utils.translation import gettext_lazy as _
  14. from drf_spectacular.extensions import OpenApiAuthenticationExtension
  15. from rest_framework.authentication import TokenAuthentication
  16. from common.exception.app_exception import AppAuthenticationFailed, AppEmbedIdentityFailed, AppChatNumOutOfBoundsFailed, \
  17. AppApiException
  18. from common.utils.logger import maxkb_logger
  19. token_cache = cache.caches['default']
  20. class AnonymousAuthentication(TokenAuthentication):
  21. def authenticate(self, request):
  22. return None, None
  23. class AnonymousAuthenticationScheme(OpenApiAuthenticationExtension):
  24. target_class = AnonymousAuthentication # 绑定到你的自定义认证类
  25. name = "AnonymousAuth" # 自定义认证名称(显示在 Swagger UI 中)
  26. def get_security_definition(self, auto_schema):
  27. # 定义认证方式,这里假设匿名认证不需要凭证
  28. return {
  29. }
  30. def get_security_requirement(self, auto_schema):
  31. # 返回安全要求(空字典表示无需认证)
  32. return {}
  33. def new_instance_by_class_path(class_path: str):
  34. parts = class_path.rpartition('.')
  35. package_path = parts[0]
  36. class_name = parts[2]
  37. module = import_module(package_path)
  38. HandlerClass = getattr(module, class_name)
  39. return HandlerClass()
  40. handles = [new_instance_by_class_path(class_path) for class_path in settings.AUTH_HANDLES]
  41. chat_handles = [new_instance_by_class_path(class_path) for class_path in settings.CHAT_AUTH_HANDLES]
  42. all_handles = handles + chat_handles
  43. class TokenDetails:
  44. token_details = None
  45. is_load = False
  46. def __init__(self, token: str):
  47. self.token = token
  48. def get_token_details(self):
  49. if self.token_details is None and not self.is_load:
  50. try:
  51. self.token_details = signing.loads(self.token)
  52. except Exception as e:
  53. self.is_load = True
  54. return self.token_details
  55. class TokenAuth(TokenAuthentication):
  56. keyword = "Bearer"
  57. # 重新 authenticate 方法,自定义认证规则
  58. def authenticate(self, request):
  59. auth = request.META.get('HTTP_AUTHORIZATION')
  60. # 未认证
  61. if auth is None:
  62. raise AppAuthenticationFailed(1003, _('Not logged in, please log in first'))
  63. if not auth.startswith("Bearer "):
  64. raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
  65. try:
  66. token = auth[7:]
  67. token_details = TokenDetails(token)
  68. for handle in handles:
  69. if handle.support(request, token, token_details.get_token_details):
  70. return handle.handle(request, token, token_details.get_token_details)
  71. raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
  72. except Exception as e:
  73. maxkb_logger.error(f'Exception: {e}', exc_info=True)
  74. if isinstance(e, AppEmbedIdentityFailed) or isinstance(e, AppChatNumOutOfBoundsFailed) or isinstance(e,
  75. AppApiException):
  76. raise e
  77. raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
  78. class ChatTokenAuth(TokenAuthentication):
  79. keyword = "Bearer"
  80. # 重新 authenticate 方法,自定义认证规则
  81. def authenticate(self, request):
  82. auth = request.META.get('HTTP_AUTHORIZATION')
  83. # 未认证
  84. if auth is None:
  85. raise AppAuthenticationFailed(1003, _('Not logged in, please log in first'))
  86. if not auth.startswith("Bearer "):
  87. raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
  88. try:
  89. token = auth[7:]
  90. token_details = TokenDetails(token)
  91. for handle in chat_handles:
  92. if handle.support(request, token, token_details.get_token_details):
  93. return handle.handle(request, token, token_details.get_token_details)
  94. raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
  95. except Exception as e:
  96. maxkb_logger.error(f'Exception: {e}', exc_info=True)
  97. if isinstance(e, AppEmbedIdentityFailed) or isinstance(e, AppChatNumOutOfBoundsFailed) or isinstance(e,
  98. AppApiException):
  99. raise e
  100. raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
  101. class AllTokenAuth(TokenAuthentication):
  102. keyword = "Bearer"
  103. # 重新 authenticate 方法,自定义认证规则
  104. def authenticate(self, request):
  105. auth = request.META.get('HTTP_AUTHORIZATION')
  106. # 未认证
  107. if auth is None:
  108. raise AppAuthenticationFailed(1003, _('Not logged in, please log in first'))
  109. if not auth.startswith("Bearer "):
  110. raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
  111. try:
  112. token = auth[7:]
  113. token_details = TokenDetails(token)
  114. for handle in all_handles:
  115. if handle.support(request, token, token_details.get_token_details):
  116. return handle.handle(request, token, token_details.get_token_details)
  117. raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
  118. except Exception as e:
  119. maxkb_logger.error(f'Exception: {e}', exc_info=True)
  120. if isinstance(e, AppEmbedIdentityFailed) or isinstance(e, AppChatNumOutOfBoundsFailed) or isinstance(e,
  121. AppApiException):
  122. raise e
  123. raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
  124. class WebhookAuth(TokenAuthentication):
  125. keyword = "Bearer"
  126. # 重新 authenticate 方法,自定义认证规则
  127. def authenticate(self, request):
  128. return None, {}