| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- import requests
- class SSOClient:
- """与统一认证平台(LQAI-middle-platform)交互的 HTTP 客户端。"""
- def __init__(self, base_url, client_id, client_secret, redirect_uri, scope="email"):
- self.base_url = base_url.rstrip("/")
- self.client_id = client_id
- self.client_secret = client_secret
- self.redirect_uri = redirect_uri
- self.scope = scope
- def get_authorize_url(self, state=None):
- """构建 SSO 授权 URL(GET /oauth/authorize)。"""
- params = {
- "response_type": "code",
- "client_id": self.client_id,
- "redirect_uri": self.redirect_uri,
- "scope": self.scope,
- }
- if state:
- params["state"] = state
- qs = "&".join(f"{k}={v}" for k, v in params.items())
- return f"{self.base_url}/oauth/authorize?{qs}"
- def exchange_code_for_token(self, code):
- """用授权码换取 SSO access_token(POST /oauth/token)。"""
- url = f"{self.base_url}/oauth/token"
- data = {
- "grant_type": "authorization_code",
- "code": code,
- "redirect_uri": self.redirect_uri,
- "client_id": self.client_id,
- "client_secret": self.client_secret,
- }
- resp = requests.post(url, data=data, timeout=15)
- if resp.status_code != 200:
- raise SSOError(f"Token exchange failed: {resp.status_code} {resp.text}")
- result = resp.json()
- if "access_token" not in result:
- raise SSOError(f"Unexpected token response: {result}")
- return result
- def get_userinfo(self, access_token):
- """用 SSO access_token 获取用户信息(GET /oauth/userinfo)。"""
- url = f"{self.base_url}/oauth/userinfo"
- headers = {"Authorization": f"Bearer {access_token}"}
- resp = requests.get(url, headers=headers, timeout=15)
- if resp.status_code != 200:
- raise SSOError(f"UserInfo failed: {resp.status_code} {resp.text}")
- result = resp.json()
- if "error" in result:
- raise SSOError(f"UserInfo error: {result.get('error_description', result['error'])}")
- return result
- class SSOError(Exception):
- """SSO 交互过程中产生的异常。"""
- pass
|