# coding=utf-8 """ 用户组视图 基于 system_manage.UserGroup 提供批量授权 API """ from django.utils.translation import gettext as _ from rest_framework.request import Request from rest_framework.views import APIView from common.auth import TokenAuth from common.exception.app_exception import AppApiException from common.result import result from system_manage.models.chat_user import UserGroup, UserGroupRelation, ResourceChatUserGroupAuthorize from users.models.user import User class UserGroupView(APIView): """用户组管理""" authentication_classes = [TokenAuth] class List(APIView): """获取用户组列表""" authentication_classes = [TokenAuth] def get(self, request: Request, workspace_id: str): groups = UserGroup.objects.all() data = [{ 'id': str(g.id), 'name': g.name, 'member_count': UserGroupRelation.objects.filter(group=g).count(), 'auth_count': ResourceChatUserGroupAuthorize.objects.filter(user_group=g).count(), } for g in groups] return result.success(data) class Create(APIView): """创建用户组""" authentication_classes = [TokenAuth] def post(self, request: Request, workspace_id: str): name = request.data.get('name', '') if not name: raise AppApiException(400, _('用户组名称不能为空')) if UserGroup.objects.filter(name=name).exists(): raise AppApiException(400, _('用户组名称已存在')) group = UserGroup.objects.create(name=name) return result.success({ 'id': str(group.id), 'name': group.name, }) class Operate(APIView): """用户组操作(获取/更新/删除)""" authentication_classes = [TokenAuth] def get(self, request: Request, workspace_id: str, group_id: str): try: group = UserGroup.objects.get(id=group_id) except UserGroup.DoesNotExist: raise AppApiException(404, _('用户组不存在')) # 获取成员列表 relations = UserGroupRelation.objects.filter(group=group) member_list = [{ 'id': str(r.user_id), 'username': r.user.username if r.user else '', 'nick_name': r.user.nick_name if r.user else '', } for r in relations if r.user] # 获取授权列表 auths = ResourceChatUserGroupAuthorize.objects.filter(user_group=group) auth_list = [{ 'id': str(a.id), 'resource_type': a.resource_type, 'resource_id': str(a.resource_id), 'is_auth': a.is_auth, } for a in auths] return result.success({ 'id': str(group.id), 'name': group.name, 'members': member_list, 'authorizations': auth_list, }) def put(self, request: Request, workspace_id: str, group_id: str): try: group = UserGroup.objects.get(id=group_id) except UserGroup.DoesNotExist: raise AppApiException(404, _('用户组不存在')) name = request.data.get('name') if name is not None: if UserGroup.objects.filter(name=name).exclude(id=group_id).exists(): raise AppApiException(400, _('用户组名称已存在')) group.name = name group.save() return result.success({ 'id': str(group.id), 'name': group.name, }) def delete(self, request: Request, workspace_id: str, group_id: str): try: group = UserGroup.objects.get(id=group_id) except UserGroup.DoesNotExist: raise AppApiException(404, _('用户组不存在')) # 删除关联数据 UserGroupRelation.objects.filter(group=group).delete() ResourceChatUserGroupAuthorize.objects.filter(user_group=group).delete() group.delete() return result.success(True) class BatchAddMembers(APIView): """批量添加用户组成员""" authentication_classes = [TokenAuth] def post(self, request: Request, workspace_id: str, group_id: str): try: group = UserGroup.objects.get(id=group_id) except UserGroup.DoesNotExist: raise AppApiException(404, _('用户组不存在')) user_ids = request.data.get('user_ids', []) if not user_ids: raise AppApiException(400, _('用户ID列表不能为空')) added_count = 0 for user_id in user_ids: try: user = User.objects.get(id=user_id) _, created = UserGroupRelation.objects.get_or_create( group=group, user=user ) if created: added_count += 1 except User.DoesNotExist: continue return result.success({'added_count': added_count}) class BatchRemoveMembers(APIView): """批量移除用户组成员""" authentication_classes = [TokenAuth] def post(self, request: Request, workspace_id: str, group_id: str): try: group = UserGroup.objects.get(id=group_id) except UserGroup.DoesNotExist: raise AppApiException(404, _('用户组不存在')) user_ids = request.data.get('user_ids', []) if not user_ids: raise AppApiException(400, _('用户ID列表不能为空')) removed_count = UserGroupRelation.objects.filter( group=group, user_id__in=user_ids ).delete()[0] return result.success({'removed_count': removed_count}) class BatchAuthorize(APIView): """批量授权用户组资源""" authentication_classes = [TokenAuth] def post(self, request: Request, workspace_id: str, group_id: str): try: group = UserGroup.objects.get(id=group_id) except UserGroup.DoesNotExist: raise AppApiException(404, _('用户组不存在')) authorizations = request.data.get('authorizations', []) if not authorizations: raise AppApiException(400, _('授权列表不能为空')) added_count = 0 for auth in authorizations: resource_type = auth.get('resource_type') resource_id = auth.get('resource_id') is_auth = auth.get('is_auth', True) if not resource_type or not resource_id: continue _, created = ResourceChatUserGroupAuthorize.objects.update_or_create( user_group=group, resource_type=resource_type, resource_id=resource_id, defaults={ 'workspace_id': workspace_id, 'is_auth': is_auth } ) if created: added_count += 1 return result.success({'added_count': added_count}) class BatchRevokeAuthorization(APIView): """批量撤销用户组授权""" authentication_classes = [TokenAuth] def post(self, request: Request, workspace_id: str, group_id: str): try: group = UserGroup.objects.get(id=group_id) except UserGroup.DoesNotExist: raise AppApiException(404, _('用户组不存在')) auth_ids = request.data.get('auth_ids', []) if not auth_ids: raise AppApiException(400, _('授权ID列表不能为空')) removed_count = ResourceChatUserGroupAuthorize.objects.filter( user_group=group, id__in=auth_ids ).delete()[0] return result.success({'removed_count': removed_count})