sso_client.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # coding=utf-8
  2. """
  3. SSO 统一认证客户端
  4. 对接 LQAI-middle-platform 统一认证平台
  5. """
  6. from typing import Dict, Optional
  7. import requests
  8. from django.conf import settings
  9. from django.utils.translation import gettext as _
  10. from common.exception.app_exception import AppApiException
  11. from common.utils.logger import maxkb_logger
  12. class SSOClient:
  13. """SSO 统一认证客户端"""
  14. def __init__(self, base_url: str = None, client_id: str = None,
  15. client_secret: str = None, redirect_uri: str = None):
  16. self.base_url = (base_url or getattr(settings, 'SSO_BASE_URL', '')).rstrip('/')
  17. self.client_id = client_id or getattr(settings, 'SSO_CLIENT_ID', '')
  18. self.client_secret = client_secret or getattr(settings, 'SSO_CLIENT_SECRET', '')
  19. self.redirect_uri = redirect_uri or getattr(settings, 'SSO_REDIRECT_URI', '')
  20. def get_authorize_url(self, state: str = None) -> str:
  21. """获取 SSO 授权 URL"""
  22. params = {
  23. 'response_type': 'code',
  24. 'client_id': self.client_id,
  25. 'redirect_uri': self.redirect_uri,
  26. 'scope': 'profile email',
  27. }
  28. if state:
  29. params['state'] = state
  30. query_string = '&'.join(f'{k}={v}' for k, v in params.items())
  31. return f"{self.base_url}/oauth/authorize?{query_string}"
  32. def exchange_code(self, code: str) -> Dict:
  33. """用授权码换取 SSO access_token"""
  34. url = f"{self.base_url}/oauth/token"
  35. payload = {
  36. 'grant_type': 'authorization_code',
  37. 'code': code,
  38. 'redirect_uri': self.redirect_uri,
  39. 'client_id': self.client_id,
  40. 'client_secret': self.client_secret,
  41. }
  42. try:
  43. response = requests.post(url, data=payload, timeout=10)
  44. response.raise_for_status()
  45. result = response.json()
  46. if 'error' in result:
  47. raise AppApiException(400, _('SSO token exchange failed: {error}').format(
  48. error=result.get('error_description', result['error'])))
  49. return result
  50. except requests.exceptions.RequestException as e:
  51. maxkb_logger.error(f'SSO token exchange request failed: {e}')
  52. raise AppApiException(502, _('Failed to connect to SSO server: {error}').format(error=str(e)))
  53. def get_userinfo(self, access_token: str) -> Dict:
  54. """获取用户信息"""
  55. url = f"{self.base_url}/oauth/userinfo"
  56. headers = {
  57. 'Authorization': f'Bearer {access_token}',
  58. }
  59. try:
  60. response = requests.get(url, headers=headers, timeout=10)
  61. response.raise_for_status()
  62. result = response.json()
  63. if 'error' in result:
  64. raise AppApiException(400, _('SSO userinfo failed: {error}').format(
  65. error=result.get('error_description', result['error'])))
  66. return result
  67. except requests.exceptions.RequestException as e:
  68. maxkb_logger.error(f'SSO userinfo request failed: {e}')
  69. raise AppApiException(502, _('Failed to connect to SSO server: {error}').format(error=str(e)))
  70. def get_sso_client(**kwargs) -> SSOClient:
  71. """获取 SSO 客户端实例"""
  72. return SSOClient(**kwargs)