| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- # coding=utf-8
- """
- SSO 统一认证视图
- 对接 LQAI-middle-platform 统一认证平台
- """
- import uuid_utils.compat as uuid
- from django.utils.translation import gettext as _
- from rest_framework.request import Request
- from rest_framework.views import APIView
- from common.auth import TokenAuth
- from common.exception.app_exception import AppApiException
- from common.result import result
- from sso.services.sso_client import get_sso_client
- from users.models import User
- from common.utils.logger import maxkb_logger
- # SSO 角色 code → 本地角色映射
- SSO_ROLE_MAP = {
- 'super_admin': 'ADMIN',
- 'ws_admin': 'WORKSPACE_MANAGE',
- 'user': 'USER',
- }
- class SSOView(APIView):
- """SSO 统一认证"""
- authentication_classes = [TokenAuth]
- class AuthorizeUrl(APIView):
- """获取 SSO 授权 URL"""
- authentication_classes = []
- def get(self, request: Request):
- redirect = request.query_params.get('redirect', 'false') == 'true'
- state = request.query_params.get('state', str(uuid.uuid4()))
- client = get_sso_client()
- authorize_url = client.get_authorize_url(state=state)
- if redirect:
- from django.http import HttpResponseRedirect
- return HttpResponseRedirect(authorize_url)
- return result.success({
- 'authorize_url': authorize_url,
- 'state': state,
- })
- class ExchangeCode(APIView):
- """授权码交换(核心免登接口)
- POST /api/oauth/exchange-code
- """
- authentication_classes = []
- def post(self, request: Request):
- code = request.data.get('code', '')
- if not code:
- raise AppApiException(400, _('缺少授权码'))
- client = get_sso_client()
- # Step 1: 用 code 换 SSO access_token
- token_data = client.exchange_code(code)
- sso_access_token = token_data.get('access_token', '')
- if not sso_access_token:
- raise AppApiException(400, _('SSO token exchange failed'))
- # Step 2: 获取用户信息(含角色)
- userinfo = client.get_userinfo(sso_access_token)
- # Step 3: 同步用户到本地数据库
- user = SSOView.ExchangeCode._sync_user(userinfo)
- # Step 4: 同步角色
- roles = userinfo.get('roles', [])
- SSOView.ExchangeCode._sync_roles(user, roles)
- # Step 5: 签发本地 JWT
- from common.utils.common import signing
- token = signing.dumps({'user_id': str(user.id)})
- refresh_token = signing.dumps({'user_id': str(user.id), 'type': 'refresh'})
- return result.success({
- 'token': token,
- 'refresh_token': refresh_token,
- 'user': {
- 'id': str(user.id),
- 'username': user.username,
- 'email': user.email,
- 'phone': user.phone,
- 'is_superuser': user.is_superuser,
- 'is_active': user.is_active,
- 'roles': [user.role],
- },
- })
- @staticmethod
- def _sync_user(userinfo: dict) -> User:
- """同步 SSO 用户到本地数据库"""
- username = userinfo.get('username', '')
- email = userinfo.get('email', '')
- real_name = userinfo.get('real_name', '')
- if not username:
- raise AppApiException(400, _('SSO user info missing username'))
- user, created = User.objects.get_or_create(
- username=username,
- defaults={
- 'email': email,
- 'nick_name': real_name or username,
- 'is_active': True,
- }
- )
- if not created:
- user.email = email
- if real_name:
- user.nick_name = real_name
- user.save()
- return user
- @staticmethod
- def _sync_roles(user: User, sso_roles: list):
- """同步 SSO 角色到本地用户角色字段"""
- if not sso_roles:
- return
- # 取第一个匹配的本地角色
- for role_info in sso_roles:
- code = role_info.get('code', '')
- local_role = SSO_ROLE_MAP.get(code)
- if local_role:
- user.role = local_role
- user.save()
- return
- # 没有匹配的角色,默认普通用户
- if not user.role:
- user.role = 'USER'
- user.save()
|