# coding=utf-8 """ SSO 统一认证客户端 对接 LQAI-middle-platform 统一认证平台 """ from typing import Dict, Optional import requests from django.conf import settings from django.utils.translation import gettext as _ from common.exception.app_exception import AppApiException from common.utils.logger import maxkb_logger class SSOClient: """SSO 统一认证客户端""" def __init__(self, base_url: str = None, client_id: str = None, client_secret: str = None, redirect_uri: str = None): self.base_url = (base_url or getattr(settings, 'SSO_BASE_URL', '')).rstrip('/') self.client_id = client_id or getattr(settings, 'SSO_CLIENT_ID', '') self.client_secret = client_secret or getattr(settings, 'SSO_CLIENT_SECRET', '') self.redirect_uri = redirect_uri or getattr(settings, 'SSO_REDIRECT_URI', '') def get_authorize_url(self, state: str = None) -> str: """获取 SSO 授权 URL""" params = { 'response_type': 'code', 'client_id': self.client_id, 'redirect_uri': self.redirect_uri, 'scope': 'profile email', } if state: params['state'] = state query_string = '&'.join(f'{k}={v}' for k, v in params.items()) return f"{self.base_url}/oauth/authorize?{query_string}" def exchange_code(self, code: str) -> Dict: """用授权码换取 SSO access_token""" url = f"{self.base_url}/oauth/token" payload = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': self.redirect_uri, 'client_id': self.client_id, 'client_secret': self.client_secret, } try: response = requests.post(url, data=payload, timeout=10) response.raise_for_status() result = response.json() if 'error' in result: raise AppApiException(400, _('SSO token exchange failed: {error}').format( error=result.get('error_description', result['error']))) return result except requests.exceptions.RequestException as e: maxkb_logger.error(f'SSO token exchange request failed: {e}') raise AppApiException(502, _('Failed to connect to SSO server: {error}').format(error=str(e))) def get_userinfo(self, access_token: str) -> Dict: """获取用户信息""" url = f"{self.base_url}/oauth/userinfo" headers = { 'Authorization': f'Bearer {access_token}', } try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() result = response.json() if 'error' in result: raise AppApiException(400, _('SSO userinfo failed: {error}').format( error=result.get('error_description', result['error']))) return result except requests.exceptions.RequestException as e: maxkb_logger.error(f'SSO userinfo request failed: {e}') raise AppApiException(502, _('Failed to connect to SSO server: {error}').format(error=str(e))) def get_sso_client(**kwargs) -> SSOClient: """获取 SSO 客户端实例""" return SSOClient(**kwargs)