sso_client.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import requests
  2. class SSOClient:
  3. """与统一认证平台(LQAI-middle-platform)交互的 HTTP 客户端。"""
  4. def __init__(self, base_url, client_id, client_secret, redirect_uri, scope="email"):
  5. self.base_url = base_url.rstrip("/")
  6. self.client_id = client_id
  7. self.client_secret = client_secret
  8. self.redirect_uri = redirect_uri
  9. self.scope = scope
  10. def get_authorize_url(self, state=None):
  11. """构建 SSO 授权 URL(GET /oauth/authorize)。"""
  12. params = {
  13. "response_type": "code",
  14. "client_id": self.client_id,
  15. "redirect_uri": self.redirect_uri,
  16. "scope": self.scope,
  17. }
  18. if state:
  19. params["state"] = state
  20. qs = "&".join(f"{k}={v}" for k, v in params.items())
  21. return f"{self.base_url}/oauth/authorize?{qs}"
  22. def exchange_code_for_token(self, code):
  23. """用授权码换取 SSO access_token(POST /oauth/token)。"""
  24. url = f"{self.base_url}/oauth/token"
  25. data = {
  26. "grant_type": "authorization_code",
  27. "code": code,
  28. "redirect_uri": self.redirect_uri,
  29. "client_id": self.client_id,
  30. "client_secret": self.client_secret,
  31. }
  32. resp = requests.post(url, data=data, timeout=15)
  33. if resp.status_code != 200:
  34. raise SSOError(f"Token exchange failed: {resp.status_code} {resp.text}")
  35. result = resp.json()
  36. if "access_token" not in result:
  37. raise SSOError(f"Unexpected token response: {result}")
  38. return result
  39. def get_userinfo(self, access_token):
  40. """用 SSO access_token 获取用户信息(GET /oauth/userinfo)。"""
  41. url = f"{self.base_url}/oauth/userinfo"
  42. headers = {"Authorization": f"Bearer {access_token}"}
  43. resp = requests.get(url, headers=headers, timeout=15)
  44. if resp.status_code != 200:
  45. raise SSOError(f"UserInfo failed: {resp.status_code} {resp.text}")
  46. result = resp.json()
  47. if "error" in result:
  48. raise SSOError(f"UserInfo error: {result.get('error_description', result['error'])}")
  49. return result
  50. class SSOError(Exception):
  51. """SSO 交互过程中产生的异常。"""
  52. pass