| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- # coding=utf-8
- """
- SSO 统一认证客户端
- 对接 LQAI-middle-platform 统一认证平台
- """
- from typing import Dict, Optional
- import requests
- from django.conf import settings
- from django.utils.translation import gettext as _
- from common.exception.app_exception import AppApiException
- from common.utils.logger import maxkb_logger
- class SSOClient:
- """SSO 统一认证客户端"""
- def __init__(self, base_url: str = None, client_id: str = None,
- client_secret: str = None, redirect_uri: str = None):
- self.base_url = (base_url or getattr(settings, 'SSO_BASE_URL', '')).rstrip('/')
- self.client_id = client_id or getattr(settings, 'SSO_CLIENT_ID', '')
- self.client_secret = client_secret or getattr(settings, 'SSO_CLIENT_SECRET', '')
- self.redirect_uri = redirect_uri or getattr(settings, 'SSO_REDIRECT_URI', '')
- def get_authorize_url(self, state: str = None) -> str:
- """获取 SSO 授权 URL"""
- params = {
- 'response_type': 'code',
- 'client_id': self.client_id,
- 'redirect_uri': self.redirect_uri,
- 'scope': 'profile email',
- }
- if state:
- params['state'] = state
- query_string = '&'.join(f'{k}={v}' for k, v in params.items())
- return f"{self.base_url}/oauth/authorize?{query_string}"
- def exchange_code(self, code: str) -> Dict:
- """用授权码换取 SSO access_token"""
- url = f"{self.base_url}/oauth/token"
- payload = {
- 'grant_type': 'authorization_code',
- 'code': code,
- 'redirect_uri': self.redirect_uri,
- 'client_id': self.client_id,
- 'client_secret': self.client_secret,
- }
- try:
- response = requests.post(url, data=payload, timeout=10)
- response.raise_for_status()
- result = response.json()
- if 'error' in result:
- raise AppApiException(400, _('SSO token exchange failed: {error}').format(
- error=result.get('error_description', result['error'])))
- return result
- except requests.exceptions.RequestException as e:
- maxkb_logger.error(f'SSO token exchange request failed: {e}')
- raise AppApiException(502, _('Failed to connect to SSO server: {error}').format(error=str(e)))
- def get_userinfo(self, access_token: str) -> Dict:
- """获取用户信息"""
- url = f"{self.base_url}/oauth/userinfo"
- headers = {
- 'Authorization': f'Bearer {access_token}',
- }
- try:
- response = requests.get(url, headers=headers, timeout=10)
- response.raise_for_status()
- result = response.json()
- if 'error' in result:
- raise AppApiException(400, _('SSO userinfo failed: {error}').format(
- error=result.get('error_description', result['error'])))
- return result
- except requests.exceptions.RequestException as e:
- maxkb_logger.error(f'SSO userinfo request failed: {e}')
- raise AppApiException(502, _('Failed to connect to SSO server: {error}').format(error=str(e)))
- def get_sso_client(**kwargs) -> SSOClient:
- """获取 SSO 客户端实例"""
- return SSOClient(**kwargs)
|