authentication.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # coding=utf-8
  2. """
  3. @project: MaxKB
  4. @Author:虎虎
  5. @file: authentication.py
  6. @date:2025/4/15 20:12
  7. @desc:
  8. """
  9. from typing import List
  10. from django.utils.translation import gettext_lazy as _
  11. from rest_framework.request import Request
  12. from common.constants.permission_constants import PermissionConstants, RoleConstants, ViewPermission, CompareConstants, \
  13. Permission, Role
  14. from common.exception.app_exception import AppUnauthorizedFailed
  15. def exist_permissions_by_permission_constants(user_permission: List[PermissionConstants],
  16. permission_list: List[PermissionConstants]):
  17. """
  18. 用户是否拥有 permission_list的权限
  19. :param user_permission: 用户权限
  20. :param permission_list: 需要的权限
  21. :return: 是否拥有
  22. """
  23. return any(list(map(lambda up: permission_list.__contains__(up), user_permission)))
  24. def exist_role_by_role_constants(user_role: List[RoleConstants],
  25. role_list: List[RoleConstants]):
  26. """
  27. 用户是否拥有这个角色
  28. :param user_role: 用户角色
  29. :param role_list: 需要拥有的角色
  30. :return: 是否拥有
  31. """
  32. return any([True for role in role_list if user_role.__contains__(role.value.__str__())])
  33. def exist_permissions_by_view_permission(user_role: List[RoleConstants],
  34. user_permission: List[PermissionConstants | object],
  35. permission: ViewPermission, request, **kwargs):
  36. """
  37. 用户是否存在这些权限
  38. :param request:
  39. :param user_role: 用户角色
  40. :param user_permission: 用户权限
  41. :param permission: 所属权限
  42. :return: 是否存在 True False
  43. """
  44. role_list = [user_r(request, kwargs) if callable(user_r) else user_r for user_r in
  45. permission.roleList]
  46. role_ok = any(list(map(lambda up: role_list.__contains__(up),
  47. user_role)))
  48. permission_list = [user_p(request, kwargs) if callable(user_p) else user_p for user_p in
  49. permission.permissionList
  50. ]
  51. permission_ok = any(list(map(lambda up: permission_list.__contains__(up),
  52. user_permission)))
  53. return role_ok | permission_ok if permission.compare == CompareConstants.OR else role_ok & permission_ok
  54. def exist_permissions(user_role: List[RoleConstants], user_permission: List[PermissionConstants], permission, request,
  55. **kwargs):
  56. if isinstance(permission, ViewPermission):
  57. return exist_permissions_by_view_permission(user_role, user_permission, permission, request, **kwargs)
  58. if isinstance(permission, RoleConstants):
  59. return exist_role_by_role_constants(user_role, [permission])
  60. if isinstance(permission, PermissionConstants):
  61. return exist_permissions_by_permission_constants(user_permission, [permission])
  62. if isinstance(permission, Permission):
  63. return user_permission.__contains__(permission)
  64. if isinstance(permission, Role):
  65. return user_role.__contains__(permission.__str__())
  66. return False
  67. def exist(user_role: List[RoleConstants], user_permission: List[PermissionConstants], permission, request, **kwargs):
  68. if callable(permission):
  69. p = permission(request, kwargs)
  70. return exist_permissions(user_role, user_permission, p, request, **kwargs)
  71. return exist_permissions(user_role, user_permission, permission, request, **kwargs)
  72. def get_is_permissions(request, **kwargs):
  73. def is_permissions(*permission, compare=CompareConstants.OR):
  74. exit_list = list(
  75. map(lambda p: exist(request.auth.role_list, request.auth.permission_list, p, request, **kwargs),
  76. permission))
  77. return any(exit_list) if compare == CompareConstants.OR else all(exit_list)
  78. return is_permissions
  79. def check_batch_permissions(request: Request, id_list: List[str], id_key: str, permissions: tuple,
  80. compare=CompareConstants.OR, **kwargs) -> List[str]:
  81. if not id_list:
  82. return []
  83. # workspace manager 直接放行
  84. # 预检
  85. kwargs[id_key] = '__workspace_level_pre_check__'
  86. pre_check = list(
  87. map(lambda p: exist(request.auth.role_list, request.auth.permission_list, p, request, **kwargs),
  88. permissions)
  89. )
  90. if any(pre_check) if compare == CompareConstants.OR else all(pre_check):
  91. return list(id_list)
  92. # 逐个资源校验
  93. result_list = []
  94. for resource_id in id_list:
  95. kwargs[id_key] = resource_id
  96. exit_list = list(
  97. map(lambda p: exist(request.auth.role_list, request.auth.permission_list, p, request, **kwargs),
  98. permissions)
  99. )
  100. if any(exit_list) if compare == CompareConstants.OR else all(exit_list):
  101. result_list.append(resource_id)
  102. return result_list
  103. def has_permissions(*permission, compare=CompareConstants.OR):
  104. """
  105. 权限 role or permission
  106. :param compare: 比较符号
  107. :param permission: 如果是角色 role:roleId
  108. :return: 权限装饰器函数,用于判断用户是否有权限访问当前接口
  109. """
  110. def inner(func):
  111. def run(view, request, **kwargs):
  112. exit_list = list(
  113. map(lambda p: exist(request.auth.role_list, request.auth.permission_list, p, request, **kwargs),
  114. permission))
  115. # 判断是否有权限
  116. if any(exit_list) if compare == CompareConstants.OR else all(exit_list):
  117. return func(view, request, **kwargs)
  118. raise AppUnauthorizedFailed(403, _('No permission to access'))
  119. return run
  120. return inner