|
|
@@ -0,0 +1,144 @@
|
|
|
+# coding=utf-8
|
|
|
+"""
|
|
|
+SSO 统一认证视图
|
|
|
+对接 LQAI-middle-platform 统一认证平台
|
|
|
+"""
|
|
|
+import uuid_utils.compat as uuid
|
|
|
+from django.utils.translation import gettext as _
|
|
|
+from rest_framework.request import Request
|
|
|
+from rest_framework.views import APIView
|
|
|
+
|
|
|
+from common.auth import TokenAuth
|
|
|
+from common.exception.app_exception import AppApiException
|
|
|
+from common.result import result
|
|
|
+from sso.services.sso_client import get_sso_client
|
|
|
+from users.models import User
|
|
|
+from common.utils.logger import maxkb_logger
|
|
|
+
|
|
|
+# SSO 角色 code → 本地角色映射
|
|
|
+SSO_ROLE_MAP = {
|
|
|
+ 'super_admin': 'ADMIN',
|
|
|
+ 'ws_admin': 'WORKSPACE_MANAGE',
|
|
|
+ 'user': 'USER',
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+class SSOView(APIView):
|
|
|
+ """SSO 统一认证"""
|
|
|
+ authentication_classes = [TokenAuth]
|
|
|
+
|
|
|
+ class AuthorizeUrl(APIView):
|
|
|
+ """获取 SSO 授权 URL"""
|
|
|
+ authentication_classes = []
|
|
|
+
|
|
|
+ def get(self, request: Request):
|
|
|
+ redirect = request.query_params.get('redirect', 'false') == 'true'
|
|
|
+ state = request.query_params.get('state', str(uuid.uuid4()))
|
|
|
+
|
|
|
+ client = get_sso_client()
|
|
|
+ authorize_url = client.get_authorize_url(state=state)
|
|
|
+
|
|
|
+ if redirect:
|
|
|
+ from django.http import HttpResponseRedirect
|
|
|
+ return HttpResponseRedirect(authorize_url)
|
|
|
+
|
|
|
+ return result.success({
|
|
|
+ 'authorize_url': authorize_url,
|
|
|
+ 'state': state,
|
|
|
+ })
|
|
|
+
|
|
|
+ class ExchangeCode(APIView):
|
|
|
+ """授权码交换(核心免登接口)
|
|
|
+ POST /api/oauth/exchange-code
|
|
|
+ """
|
|
|
+ authentication_classes = []
|
|
|
+
|
|
|
+ def post(self, request: Request):
|
|
|
+ code = request.data.get('code', '')
|
|
|
+ if not code:
|
|
|
+ raise AppApiException(400, _('缺少授权码'))
|
|
|
+
|
|
|
+ client = get_sso_client()
|
|
|
+
|
|
|
+ # Step 1: 用 code 换 SSO access_token
|
|
|
+ token_data = client.exchange_code(code)
|
|
|
+ sso_access_token = token_data.get('access_token', '')
|
|
|
+
|
|
|
+ if not sso_access_token:
|
|
|
+ raise AppApiException(400, _('SSO token exchange failed'))
|
|
|
+
|
|
|
+ # Step 2: 获取用户信息(含角色)
|
|
|
+ userinfo = client.get_userinfo(sso_access_token)
|
|
|
+
|
|
|
+ # Step 3: 同步用户到本地数据库
|
|
|
+ user = SSOView.ExchangeCode._sync_user(userinfo)
|
|
|
+
|
|
|
+ # Step 4: 同步角色
|
|
|
+ roles = userinfo.get('roles', [])
|
|
|
+ SSOView.ExchangeCode._sync_roles(user, roles)
|
|
|
+
|
|
|
+ # Step 5: 签发本地 JWT
|
|
|
+ from common.utils.common import signing
|
|
|
+ token = signing.dumps({'user_id': str(user.id)})
|
|
|
+ refresh_token = signing.dumps({'user_id': str(user.id), 'type': 'refresh'})
|
|
|
+
|
|
|
+ return result.success({
|
|
|
+ 'token': token,
|
|
|
+ 'refresh_token': refresh_token,
|
|
|
+ 'user': {
|
|
|
+ 'id': str(user.id),
|
|
|
+ 'username': user.username,
|
|
|
+ 'email': user.email,
|
|
|
+ 'phone': user.phone,
|
|
|
+ 'is_superuser': user.is_superuser,
|
|
|
+ 'is_active': user.is_active,
|
|
|
+ 'roles': [user.role],
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _sync_user(userinfo: dict) -> User:
|
|
|
+ """同步 SSO 用户到本地数据库"""
|
|
|
+ username = userinfo.get('username', '')
|
|
|
+ email = userinfo.get('email', '')
|
|
|
+ real_name = userinfo.get('real_name', '')
|
|
|
+
|
|
|
+ if not username:
|
|
|
+ raise AppApiException(400, _('SSO user info missing username'))
|
|
|
+
|
|
|
+ user, created = User.objects.get_or_create(
|
|
|
+ username=username,
|
|
|
+ defaults={
|
|
|
+ 'email': email,
|
|
|
+ 'nick_name': real_name or username,
|
|
|
+ 'is_active': True,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ if not created:
|
|
|
+ user.email = email
|
|
|
+ if real_name:
|
|
|
+ user.nick_name = real_name
|
|
|
+ user.save()
|
|
|
+
|
|
|
+ return user
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _sync_roles(user: User, sso_roles: list):
|
|
|
+ """同步 SSO 角色到本地用户角色字段"""
|
|
|
+ if not sso_roles:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 取第一个匹配的本地角色
|
|
|
+ for role_info in sso_roles:
|
|
|
+ code = role_info.get('code', '')
|
|
|
+ local_role = SSO_ROLE_MAP.get(code)
|
|
|
+ if local_role:
|
|
|
+ user.role = local_role
|
|
|
+ user.save()
|
|
|
+ return
|
|
|
+
|
|
|
+ # 没有匹配的角色,默认普通用户
|
|
|
+ if not user.role:
|
|
|
+ user.role = 'USER'
|
|
|
+ user.save()
|