| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529 |
- # coding=utf-8
- """
- @project: MaxKB
- @Author:虎虎
- @file: workspace_user_resource_permission.py
- @date:2025/4/28 17:17
- @desc:
- """
- import json
- import os
- from django.contrib.postgres.fields import ArrayField
- from django.core.cache import cache
- from django.db import models
- from django.db.models import QuerySet, Q, TextField
- from django.db.models.functions import Cast
- from django.utils.translation import gettext_lazy as _
- from rest_framework import serializers
- from application.models import Application
- from common.constants.cache_version import Cache_Version
- from common.constants.permission_constants import get_default_workspace_user_role_mapping_list, RoleConstants, \
- ResourcePermission, ResourcePermissionRole, ResourceAuthType
- from common.database_model_manage.database_model_manage import DatabaseModelManage
- from common.db.search import native_search, native_page_search, get_dynamics_model
- from common.db.sql_execute import select_list
- from common.exception.app_exception import AppApiException
- from common.utils.common import get_file_content
- from knowledge.models import Knowledge
- from maxkb.conf import PROJECT_DIR
- from maxkb.settings import edition
- from models_provider.models import Model
- from system_manage.models import WorkspaceUserResourcePermission
- from tools.models import Tool
- from users.serializers.user import is_workspace_manage
- class PermissionSerializer(serializers.Serializer):
- VIEW = serializers.BooleanField(required=True, label="可读")
- MANAGE = serializers.BooleanField(required=True, label="管理")
- ROLE = serializers.BooleanField(required=True, label="跟随角色")
- class UserResourcePermissionItemResponse(serializers.Serializer):
- id = serializers.UUIDField(required=True, label="主键id")
- name = serializers.CharField(required=True, label="资源名称")
- auth_target_type = serializers.CharField(required=True, label="授权资源")
- user_id = serializers.UUIDField(required=True, label="用户id")
- icon = serializers.CharField(required=True, label="资源图标")
- auth_type = serializers.CharField(required=True, label="授权类型")
- permission = serializers.ChoiceField(required=False, allow_null=True, allow_blank=True,
- choices=['NOT_AUTH', 'MANAGE', 'VIEW', 'ROLE'],
- label=_('permission'))
- class UserResourcePermissionResponse(serializers.Serializer):
- KNOWLEDGE = UserResourcePermissionItemResponse(many=True)
- class UpdateTeamMemberItemPermissionSerializer(serializers.Serializer):
- target_id = serializers.CharField(required=True, label=_('target id'))
- permission = serializers.ChoiceField(required=False, allow_null=True, allow_blank=True,
- choices=['NOT_AUTH', 'MANAGE', 'VIEW', 'ROLE'],
- label=_('permission'))
- class UpdateUserResourcePermissionRequest(serializers.Serializer):
- user_resource_permission_list = UpdateTeamMemberItemPermissionSerializer(required=True, many=True)
- def is_valid(self, *, auth_target_type=None, workspace_id=None, raise_exception=False):
- super().is_valid(raise_exception=True)
- user_resource_permission_list = [{'target_id': urp.get('target_id'), 'auth_target_type': auth_target_type} for
- urp in
- self.data.get("user_resource_permission_list")]
- illegal_target_id_list = select_list(
- get_file_content(
- os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', 'check_member_permission_target_exists.sql')),
- [json.dumps(user_resource_permission_list), workspace_id, workspace_id, workspace_id, workspace_id,
- workspace_id, workspace_id, workspace_id])
- if illegal_target_id_list is not None and len(illegal_target_id_list) > 0:
- raise AppApiException(500,
- _('Non-existent id')+'[' + str(illegal_target_id_list) + ']')
- m_map = {
- "KNOWLEDGE": Knowledge,
- 'TOOL': Tool,
- 'MODEL': Model,
- 'APPLICATION': Application,
- }
- sql_map = {
- "KNOWLEDGE": 'get_knowledge_user_resource_permission.sql',
- 'TOOL': 'get_tool_user_resource_permission.sql',
- 'MODEL': 'get_model_user_resource_permission.sql',
- 'APPLICATION': 'get_application_user_resource_permission.sql'
- }
- class UserResourcePermissionUserListRequest(serializers.Serializer):
- name = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('resource name'))
- permission = serializers.MultipleChoiceField(required=False, allow_null=True, allow_blank=True,
- choices=['NOT_AUTH', 'MANAGE', 'VIEW', 'ROLE'],
- label=_('permission'))
- class UserResourcePermissionSerializer(serializers.Serializer):
- workspace_id = serializers.CharField(required=True, label=_('workspace id'))
- user_id = serializers.CharField(required=True, label=_('user id'))
- auth_target_type = serializers.CharField(required=True, label=_('resource'))
- def get_queryset(self, instance):
- resource_query_set = QuerySet(
- model=get_dynamics_model({
- 'name': models.CharField(),
- "permission": models.CharField(),
- }))
- name = instance.get('name')
- permission = instance.get('permission')
- query_p_list = [None if p == "NOT_AUTH" else p for p in permission]
- if name:
- resource_query_set = resource_query_set.filter(name__contains=name)
- if permission:
- if all([p is None for p in query_p_list]):
- resource_query_set = resource_query_set.filter(permission=None)
- else:
- if any([p is None for p in query_p_list]):
- resource_query_set = resource_query_set.filter(
- Q(permission__in=query_p_list) | Q(permission=None))
- else:
- resource_query_set = resource_query_set.filter(
- permission__in=query_p_list)
- return {
- 'query_set': QuerySet(m_map.get(self.data.get('auth_target_type'))).filter(
- workspace_id=self.data.get('workspace_id')),
- 'folder_query_set': QuerySet(m_map.get(self.data.get('auth_target_type'))).filter(
- workspace_id=self.data.get('workspace_id')),
- 'workspace_user_resource_permission_query_set': QuerySet(WorkspaceUserResourcePermission).filter(
- workspace_id=self.data.get('workspace_id'), user=self.data.get('user_id'),
- auth_target_type=self.data.get('auth_target_type')),
- 'resource_query_set': resource_query_set
- }
- def is_auth(self, resource_id: str):
- self.is_valid(raise_exception=True)
- auth_target_type = self.data.get('auth_target_type')
- workspace_id = self.data.get('workspace_id')
- user_id = self.data.get('user_id')
- workspace_manage = is_workspace_manage(user_id, workspace_id)
- if workspace_manage:
- return True
- wurp = QuerySet(WorkspaceUserResourcePermission).filter(auth_target_type=auth_target_type,
- workspace_id=workspace_id, user=user_id,
- target=resource_id).first()
- if wurp is None:
- return False
- workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
- role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model")
- if wurp.auth_type == ResourceAuthType.ROLE.value:
- if workspace_user_role_mapping_model and role_permission_mapping_model:
- inner = QuerySet(workspace_user_role_mapping_model).filter(workspace_id=workspace_id, user_id=user_id)
- return QuerySet(role_permission_mapping_model).filter(role_id__in=inner,
- permission_id=(
- auth_target_type + ':READ')).exists()
- else:
- return False
- else:
- return wurp.permission_list.__contains__(ResourcePermission.VIEW.value)
- def auth_resource_batch(self, resource_id_list: list):
- self.is_valid(raise_exception=True)
- auth_target_type = self.data.get('auth_target_type')
- workspace_id = self.data.get('workspace_id')
- user_id = self.data.get('user_id')
- wurp = QuerySet(WorkspaceUserResourcePermission).filter(auth_target_type=auth_target_type,
- workspace_id=workspace_id, user_id=user_id).first()
- auth_type = wurp.auth_type if wurp else (
- ResourceAuthType.RESOURCE_PERMISSION_GROUP if edition == 'CE' else ResourceAuthType.ROLE)
- workspace_user_resource_permission = [WorkspaceUserResourcePermission(
- target=resource_id,
- auth_target_type=auth_target_type,
- permission_list=[ResourcePermission.VIEW,
- ResourcePermission.MANAGE] if auth_type == ResourceAuthType.RESOURCE_PERMISSION_GROUP else [
- ResourcePermissionRole.ROLE],
- workspace_id=workspace_id,
- user_id=user_id,
- auth_type=auth_type
- ) for resource_id in resource_id_list]
- QuerySet(WorkspaceUserResourcePermission).bulk_create(workspace_user_resource_permission)
- # 刷新缓存
- version = Cache_Version.PERMISSION_LIST.get_version()
- key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id)
- cache.delete(key, version=version)
- return True
- def auth_resource(self, resource_id: str, is_folder=False):
- self.is_valid(raise_exception=True)
- auth_target_type = self.data.get('auth_target_type')
- workspace_id = self.data.get('workspace_id')
- user_id = self.data.get('user_id')
- WorkspaceUserResourcePermission(
- target=resource_id,
- auth_target_type=auth_target_type,
- permission_list=[ResourcePermission.VIEW,
- ResourcePermission.MANAGE],
- workspace_id=workspace_id,
- user_id=user_id,
- auth_type=ResourceAuthType.RESOURCE_PERMISSION_GROUP
- ).save()
- # 刷新缓存
- version = Cache_Version.PERMISSION_LIST.get_version()
- key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id)
- cache.delete(key, version=version)
- return True
- def list(self, instance, user, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- UserResourcePermissionUserListRequest(data=instance).is_valid(raise_exception=True)
- workspace_id = self.data.get("workspace_id")
- user_id = self.data.get("user_id")
- # 用户权限列表
- user_resource_permission_list = native_search(self.get_queryset(instance), get_file_content(
- os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', sql_map.get(self.data.get('auth_target_type')))))
- return [{**user_resource_permission}
- for user_resource_permission in user_resource_permission_list]
- def page(self, instance, current_page: int, page_size: int, user, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- UserResourcePermissionUserListRequest(data=instance).is_valid(raise_exception=True)
- workspace_id = self.data.get("workspace_id")
- user_id = self.data.get("user_id")
- # 用户对应的资源权限分页列表
- user_resource_permission_page_list = native_page_search(current_page, page_size, self.get_queryset(instance),
- get_file_content(
- os.path.join(PROJECT_DIR, "apps", "system_manage",
- 'sql', sql_map.get(
- self.data.get('auth_target_type')))
- ))
- return user_resource_permission_page_list
- def edit(self, instance, user, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- UpdateUserResourcePermissionRequest(data={'user_resource_permission_list': instance}).is_valid(
- raise_exception=True,
- auth_target_type=self.data.get(
- 'auth_target_type'),
- workspace_id=self.data.get('workspace_id'))
- workspace_id = self.data.get("workspace_id")
- user_id = self.data.get("user_id")
- update_list = []
- save_list = []
- targets = [item['target_id'] for item in instance]
- QuerySet(WorkspaceUserResourcePermission).filter(
- workspace_id=workspace_id,
- user_id=user_id,
- auth_target_type=self.data.get('auth_target_type'),
- target__in=targets
- ).delete()
- workspace_user_resource_permission_exist_list = []
- for user_resource_permission in instance:
- permission = user_resource_permission['permission']
- auth_type, permission_list = permission_map[permission]
- exist_list = [user_resource_permission_exist for user_resource_permission_exist in
- workspace_user_resource_permission_exist_list if
- user_resource_permission.get('target_id') == str(user_resource_permission_exist.target)]
- if len(exist_list) > 0:
- exist_list[0].permission_list = [key for key in user_resource_permission.get('permission').keys() if
- user_resource_permission.get('permission').get(key)]
- exist_list[0].auth_type = user_resource_permission.get('auth_type')
- update_list.append(exist_list[0])
- else:
- save_list.append(WorkspaceUserResourcePermission(target=user_resource_permission.get('target_id'),
- auth_target_type=self.data.get('auth_target_type'),
- permission_list=permission_list,
- workspace_id=workspace_id,
- user_id=user_id,
- auth_type=auth_type))
- # 批量更新
- QuerySet(WorkspaceUserResourcePermission).bulk_update(update_list, ['permission_list', 'auth_type']) if len(
- update_list) > 0 else None
- # 批量插入
- QuerySet(WorkspaceUserResourcePermission).bulk_create(save_list) if len(save_list) > 0 else None
- version = Cache_Version.PERMISSION_LIST.get_version()
- key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id)
- cache.delete(key, version=version)
- return instance
- class ResourceUserPermissionUserListRequest(serializers.Serializer):
- nick_name = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('workspace id'))
- username = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('workspace id'))
- permission = serializers.MultipleChoiceField(required=False, allow_null=True, allow_blank=True,
- choices=['NOT_AUTH', 'MANAGE', 'VIEW', 'ROLE'],
- label=_('permission'))
- class ResourceUserPermissionEditRequest(serializers.Serializer):
- user_id = serializers.CharField(required=True, label=_('workspace id'))
- permission = serializers.ChoiceField(required=True, choices=['NOT_AUTH', 'MANAGE', 'VIEW', 'ROLE'],
- label=_('permission'))
- permission_map = {
- "ROLE": ("ROLE", ["ROLE"]),
- "MANAGE": ("RESOURCE_PERMISSION_GROUP", ["MANAGE", "VIEW"]),
- "VIEW": ("RESOURCE_PERMISSION_GROUP", ["VIEW"]),
- "NOT_AUTH": ("RESOURCE_PERMISSION_GROUP", []),
- }
- class ResourceUserPermissionSerializer(serializers.Serializer):
- workspace_id = serializers.CharField(required=True, label=_('workspace id'))
- target = serializers.CharField(required=True, label=_('resource id'))
- auth_target_type = serializers.CharField(required=True, label=_('resource'))
- users_permission = ResourceUserPermissionEditRequest(required=False, many=True, label=_('users_permission'))
- RESOURCE_MODEL_MAP = {
- 'APPLICATION': Application,
- 'KNOWLEDGE': Knowledge,
- 'TOOL': Tool
- }
- def get_queryset(self, instance, is_x_pack_ee: bool):
- user_query_set = QuerySet(model=get_dynamics_model({
- 'nick_name': models.CharField(),
- 'username': models.CharField(),
- "permission": models.CharField(),
- "u.id": models.UUIDField(),
- "role": models.CharField(),
- "role_setting.type": models.CharField(),
- "user_role_relation.workspace_id": models.CharField(),
- 'tmp.type_list': ArrayField(models.CharField()),
- 'tmp.role_name_list_str': models.CharField()
- }))
- nick_name = instance.get('nick_name')
- username = instance.get('username')
- role_name = instance.get('role')
- permission = instance.get('permission')
- query_p_list = [None if p == "NOT_AUTH" else p for p in permission]
- workspace_user_resource_permission_query_set = QuerySet(WorkspaceUserResourcePermission).filter(
- workspace_id=self.data.get('workspace_id'),
- auth_target_type=self.data.get('auth_target_type'),
- target=self.data.get('target'))
- if nick_name:
- user_query_set = user_query_set.filter(nick_name__contains=nick_name)
- if username:
- user_query_set = user_query_set.filter(username__contains=username)
- if permission:
- if all([p is None for p in query_p_list]):
- user_query_set = user_query_set.filter(
- permission=None)
- else:
- if any([p is None for p in query_p_list]):
- user_query_set = user_query_set.filter(
- Q(permission__in=query_p_list) | Q(permission=None))
- else:
- user_query_set = user_query_set.filter(
- permission__in=query_p_list)
- workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
- if workspace_user_role_mapping_model:
- user_query_set = user_query_set.filter(
- **{"u.id__in": QuerySet(workspace_user_role_mapping_model).filter(
- workspace_id=self.data.get('workspace_id')).values("user_id")})
- if is_x_pack_ee:
- user_query_set = user_query_set.filter(**{
- "tmp.type_list__contains": ["USER"]
- })
- role_name_and_type_query_set = QuerySet(model=get_dynamics_model({
- 'user_role_relation.workspace_id': models.CharField(),
- 'role_setting.type': models.CharField(),
- })).filter(**{
- "user_role_relation.workspace_id": self.data.get('workspace_id'),
- "role_setting.type": "USER",
- })
- if role_name:
- user_query_set = user_query_set.filter(
- **{'tmp.role_name_list_str__icontains': str(role_name)}
- )
- return {
- 'workspace_user_resource_permission_query_set': workspace_user_resource_permission_query_set,
- 'user_query_set': user_query_set,
- 'role_name_and_type_query_set': role_name_and_type_query_set
- }
- else:
- user_query_set = user_query_set.filter(
- **{'role': "USER"})
- return {
- 'workspace_user_resource_permission_query_set': workspace_user_resource_permission_query_set,
- 'user_query_set': user_query_set
- }
- def list(self, instance, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- ResourceUserPermissionUserListRequest(data=instance).is_valid(raise_exception=True)
- is_x_pack_ee = self.is_x_pack_ee()
- # 资源的用户授权列表
- resource_user_permission_list = native_search(self.get_queryset(instance, is_x_pack_ee), get_file_content(
- os.path.join(PROJECT_DIR, "apps", "system_manage",
- 'sql',
- ('get_resource_user_permission_detail_ee.sql' if is_x_pack_ee else
- 'get_resource_user_permission_detail.sql')
- )
- ))
- return resource_user_permission_list
- @staticmethod
- def is_x_pack_ee():
- workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
- role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model")
- return workspace_user_role_mapping_model is not None and role_permission_mapping_model is not None
- def page(self, instance, current_page: int, page_size: int, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- ResourceUserPermissionUserListRequest(data=instance).is_valid(raise_exception=True)
- # 分页列表
- is_x_pack_ee = self.is_x_pack_ee()
- resource_user_permission_page_list = native_page_search(current_page, page_size,
- self.get_queryset(instance, is_x_pack_ee),
- get_file_content(
- os.path.join(PROJECT_DIR, "apps", "system_manage",
- 'sql',
- (
- 'get_resource_user_permission_detail_ee.sql' if is_x_pack_ee else
- 'get_resource_user_permission_detail.sql')
- )
- ))
- return resource_user_permission_page_list
- def get_has_manage_permission_resource_under_folders(self, current_user_id, folder_ids):
- workspace_id = self.data.get("workspace_id")
- auth_target_type = self.data.get("auth_target_type")
- workspace_manage = is_workspace_manage(current_user_id, workspace_id)
- resource_model = self.RESOURCE_MODEL_MAP[auth_target_type]
- from folders.serializers.folder import has_exact_permission_by_role
- permission_id = f"{auth_target_type}:READ+AUTH"
- if workspace_manage:
- role_type = RoleConstants.WORKSPACE_MANAGE.value.__str__()
- has_user_role_exact_permission = has_exact_permission_by_role(current_user_id, workspace_id, permission_id,role_type)
- if has_user_role_exact_permission:
- current_user_managed_resources_ids = QuerySet(resource_model).filter(workspace_id=workspace_id,
- folder__in=folder_ids).annotate(
- id_str=Cast('id', TextField())
- ).values_list("id_str", flat=True)
- else:
- current_user_managed_resources_ids = []
- else:
- role_type = RoleConstants.USER.value.__str__()
- has_user_role_exact_permission = has_exact_permission_by_role(current_user_id, workspace_id, permission_id,role_type)
- permission_list = ['MANAGE']
- if has_user_role_exact_permission:
- permission_list = ['MANAGE','ROLE']
- current_user_managed_resources_ids = QuerySet(WorkspaceUserResourcePermission).filter(
- workspace_id=workspace_id, user_id=current_user_id, auth_target_type=auth_target_type,
- target__in=QuerySet(resource_model).filter(workspace_id=workspace_id, folder__in=folder_ids).annotate(
- id_str=Cast('id', TextField())
- ).values_list("id_str", flat=True),
- permission_list__overlap= permission_list).values_list('target', flat=True)
- return current_user_managed_resources_ids
- def edit(self, instance, with_valid=True, current_user_id=None):
- if with_valid:
- self.is_valid(raise_exception=True)
- ResourceUserPermissionEditRequest(data=instance, many=True).is_valid(
- raise_exception=True)
- workspace_id = self.data.get("workspace_id")
- target = self.data.get("target")
- auth_target_type = self.data.get("auth_target_type")
- users_permission = instance
- users_id = [item["user_id"] for item in users_permission]
- include_children = users_permission[0].get('include_children')
- folder_ids = users_permission[0].get('folder_ids')
- # 删除已存在的对应的用户在该资源下的权限
- if include_children:
- managed_resource_ids = list(
- self.get_has_manage_permission_resource_under_folders(current_user_id, folder_ids,)) + folder_ids
- else:
- managed_resource_ids = [target]
- QuerySet(WorkspaceUserResourcePermission).filter(
- workspace_id=workspace_id,
- target__in=managed_resource_ids,
- auth_target_type=auth_target_type,
- user_id__in=users_id
- ).delete()
- save_list = [
- WorkspaceUserResourcePermission(
- target=resource_id,
- auth_target_type=auth_target_type,
- workspace_id=workspace_id,
- auth_type=permission_map[item['permission']][0],
- user_id=item["user_id"],
- permission_list=permission_map[item['permission']][1]
- )
- for resource_id in managed_resource_ids
- for item in users_permission
- ]
- if save_list:
- QuerySet(WorkspaceUserResourcePermission).bulk_create(save_list)
- version = Cache_Version.PERMISSION_LIST.get_version()
- for user_id in users_id:
- key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id)
- cache.delete(key, version=version)
- return instance
|