| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- """OAuth2 SSO router for LQAI unified auth platform."""
- import logging
- import uuid
- import os
- import httpx
- from fastapi import APIRouter, Depends, HTTPException, Query
- from fastapi.responses import HTMLResponse
- from pydantic import BaseModel
- from sqlalchemy.orm import Session
- from app.database import get_db
- from app.models.user import User
- from app.services.auth_service import AuthService
- from app.services.user_service import UserService
- from app.schemas.user_schema import UserCreate
- from app.services import local_config_service
- logger = logging.getLogger(__name__)
- router = APIRouter(tags=["OAuth2 SSO"])
- SSO_BASE_URL = os.getenv("SSO_BASE_URL", "http://192.168.92.61:8200")
- SSO_CLIENT_ID = os.getenv("SSO_CLIENT_ID", "")
- SSO_CLIENT_SECRET = os.getenv("SSO_CLIENT_SECRET", "")
- SSO_REDIRECT_URI = os.getenv("SSO_REDIRECT_URI", "http://localhost:3000/#/auth/callback")
- SSO_ADMIN_REDIRECT_URI = os.getenv("SSO_ADMIN_REDIRECT_URI", "http://localhost:3001/#/sso-callback")
- SSO_SCOPE = os.getenv("SSO_SCOPE", "profile email")
- SSO_LOGOUT_URL = os.getenv("SSO_LOGOUT_REDIRECT_URL", "http://192.168.92.61:9200/login")
- # SSO 角色 code 中包含以下任一值即视为管理员
- ADMIN_ROLE_CODES = {"super_admin", "maas_admin", "admin"}
- def _get_sso_config():
- try:
- data = local_config_service.get_all()
- sso = data.get("sso", {})
- except Exception:
- sso = {}
- # sso_enable_redirect 控制是否启用SSO重定向
- enabled = sso.get("sso_enable_redirect")
- if enabled is None:
- enabled = True # 默认启用SSO
- return {
- "base_url": (sso.get("sso_base_url") or SSO_BASE_URL).rstrip("/"),
- "client_id": sso.get("sso_client_id") or SSO_CLIENT_ID,
- "client_secret": sso.get("sso_client_secret") or SSO_CLIENT_SECRET,
- "redirect_uri": sso.get("sso_redirect_uri") or SSO_REDIRECT_URI,
- "admin_redirect_uri": sso.get("sso_admin_redirect_uri") or SSO_ADMIN_REDIRECT_URI,
- "scope": sso.get("sso_scope") or SSO_SCOPE,
- "logout_url": sso.get("sso_logout_redirect_url") or SSO_LOGOUT_URL,
- "enabled": bool(enabled),
- }
- class CodeRequest(BaseModel):
- code: str
- @router.get("/api/sso/config")
- def get_sso_public_config():
- cfg = _get_sso_config()
- auth_url = (
- f"{cfg['base_url']}/oauth/authorize"
- f"?response_type=code&client_id={cfg['client_id']}"
- f"&redirect_uri={cfg['redirect_uri']}&scope={cfg['scope']}"
- f"&state=/"
- )
- return {
- "sso_enabled": cfg["enabled"] and bool(cfg["client_id"]),
- "authorize_url": auth_url,
- "logout_redirect_url": cfg["logout_url"],
- }
- @router.post("/api/oauth/exchange-code")
- async def exchange_code(req: CodeRequest, db: Session = Depends(get_db)):
- cfg = _get_sso_config()
- if not cfg["client_id"] or not cfg["client_secret"]:
- raise HTTPException(500, "SSO not configured")
- async with httpx.AsyncClient(timeout=15) as client:
- try:
- resp = await client.post(
- f"{cfg['base_url']}/oauth/token",
- data={
- "grant_type": "authorization_code",
- "code": req.code,
- "redirect_uri": cfg["redirect_uri"],
- "client_id": cfg["client_id"],
- "client_secret": cfg["client_secret"],
- },
- headers={"Content-Type": "application/x-www-form-urlencoded"},
- )
- except httpx.RequestError:
- raise HTTPException(502, "Cannot connect to SSO")
- if resp.status_code != 200:
- try:
- err = resp.json()
- except Exception:
- err = {}
- raise HTTPException(401, err.get("error_description") or "Invalid code")
- access_token = resp.json().get("access_token")
- if not access_token:
- raise HTTPException(502, "No access_token from SSO")
- try:
- info_resp = await client.get(
- f"{cfg['base_url']}/oauth/userinfo",
- headers={"Authorization": f"Bearer {access_token}"},
- )
- except httpx.RequestError:
- raise HTTPException(502, "Failed to get userinfo")
- if info_resp.status_code != 200:
- raise HTTPException(502, "Failed to get userinfo")
- userinfo = info_resp.json()
- username = userinfo.get("username") or userinfo.get("sub") or ""
- if not username:
- raise HTTPException(400, "No username from SSO")
- user = db.query(User).filter(User.username == username).first()
- if not user:
- nickname = userinfo.get("real_name") or username
- svc = UserService(db)
- payload = {"username": username, "password": uuid.uuid4().hex[:16], "nickname": nickname}
- if userinfo.get("email"):
- payload["email"] = userinfo["email"]
- user = svc.create_user(UserCreate(**payload))
- logger.info("SSO new user: %s", username)
- else:
- changed = False
- if userinfo.get("email") and user.email != userinfo["email"]:
- user.email = userinfo["email"]
- changed = True
- if userinfo.get("real_name") and user.nickname != userinfo["real_name"]:
- user.nickname = userinfo["real_name"]
- changed = True
- if changed:
- db.commit()
- token = AuthService.create_access_token(user.id)
- return {
- "code": 200,
- "message": "success",
- "data": {
- "token": token,
- "user": {
- "id": user.id,
- "username": user.username,
- "nickname": user.nickname,
- "email": user.email,
- "phone": user.phone,
- "avatar": user.avatar,
- },
- },
- }
- # ============ OAuth 回调(处理 SSO 服务器的 code 回传) ============
- async def _exchange_code_and_build_html(code: str, redirect_path: str, is_admin: bool = False) -> str:
- """用 code 换 local JWT,返回自动登录并跳转的 HTML 页面。"""
- from app.database import SessionLocal
- from app.models.user import User
- from app.models.admin import AdminUser
- from app.services.user_service import UserService
- from app.services.auth_service import AuthService
- from app.services.admin_auth_service import AdminAuthService
- from app.schemas.user_schema import UserCreate
- cfg = _get_sso_config()
- async with httpx.AsyncClient(timeout=15) as client:
- try:
- resp = await client.post(
- f"{cfg['base_url']}/oauth/token",
- data={
- "grant_type": "authorization_code",
- "code": code,
- "redirect_uri": cfg["admin_redirect_uri"] if is_admin else cfg["redirect_uri"],
- "client_id": cfg["client_id"],
- "client_secret": cfg["client_secret"],
- },
- headers={"Content-Type": "application/x-www-form-urlencoded"},
- )
- except httpx.RequestError:
- return "<html><body><script>alert('Cannot connect to SSO');window.location='/login';</script></body></html>"
- if resp.status_code != 200:
- try:
- err = resp.json()
- except Exception:
- err = {}
- msg = err.get("error_description") or "SSO login failed"
- import html as _html
- return f"<html><body><script>alert('{_html.escape(msg)}');window.location='/login';</script></body></html>"
- access_token = resp.json().get("access_token")
- if not access_token:
- return "<html><body><script>alert('No access_token from SSO');window.location='/login';</script></body></html>"
- try:
- info_resp = await client.get(
- f"{cfg['base_url']}/oauth/userinfo",
- headers={"Authorization": f"Bearer {access_token}"},
- )
- except httpx.RequestError:
- return "<html><body><script>alert('Failed to get userinfo');window.location='/login';</script></body></html>"
- if info_resp.status_code != 200:
- return "<html><body><script>alert('Failed to get userinfo');window.location='/login';</script></body></html>"
- userinfo = info_resp.json()
- username = userinfo.get("username") or userinfo.get("sub") or ""
- if not username:
- return "<html><body><script>alert('No username from SSO');window.location='/login';</script></body></html>"
- db = SessionLocal()
- try:
- if is_admin:
- admin = db.query(AdminUser).filter(AdminUser.username == username).first()
- if not admin:
- admin = AdminUser(
- username=username,
- password_hash=AdminAuthService.hash_password(uuid.uuid4().hex[:16]),
- nickname=userinfo.get("real_name") or username,
- status="active",
- )
- db.add(admin)
- db.commit()
- db.refresh(admin)
- local_token = AdminAuthService.create_token(admin.id, admin.username)
- else:
- user = db.query(User).filter(User.username == username).first()
- if not user:
- payload = {"username": username, "password": uuid.uuid4().hex[:16], "nickname": userinfo.get("real_name") or username}
- if userinfo.get("email"):
- payload["email"] = userinfo["email"]
- user = UserService(db).create_user(UserCreate(**payload))
- elif userinfo.get("email") and user.email != userinfo["email"]:
- user.email = userinfo["email"]
- db.commit()
- local_token = AuthService.create_access_token(user.id)
- finally:
- db.close()
- import html as _html
- safe_redirect = _html.escape(redirect_path or "/")
- return (
- "<!DOCTYPE html><html><head><title>登录中...</title></head><body>"
- "<script>"
- f"localStorage.setItem('aigc_space_token','{local_token}');"
- f"window.location.replace('{safe_redirect}');"
- "</script>"
- "<p>正在登录,请稍候...</p>"
- "</body></html>"
- )
- @router.get("/api/oauth/callback")
- async def oauth_user_callback(code: str = Query(...), state: str = Query(default="/")):
- """用户端 OAuth 回调:SSO 服务器 redirect 到此地址,后端换 token 后返回自动登录 HTML。"""
- html_content = await _exchange_code_and_build_html(code, state, is_admin=False)
- return HTMLResponse(content=html_content)
- @router.get("/api/admin/oauth/callback")
- async def oauth_admin_callback(code: str = Query(...), state: str = Query(default="/admin/")):
- """管理后台 OAuth 回调:同上,但签发管理员 token。"""
- html_content = await _exchange_code_and_build_html(code, state, is_admin=True)
- return HTMLResponse(content=html_content)
- # ============ 管理员 SSO 接口 ============
- @router.get("/api/admin/sso/config")
- def get_admin_sso_config():
- """管理后台 SSO 配置,返回管理员专用的授权 URL。"""
- cfg = _get_sso_config()
- auth_url = (
- f"{cfg['base_url']}/oauth/authorize"
- f"?response_type=code&client_id={cfg['client_id']}"
- f"&redirect_uri={cfg['admin_redirect_uri']}&scope={cfg['scope']}"
- f"&state=/admin/"
- )
- return {
- "sso_enabled": cfg["enabled"] and bool(cfg["client_id"]),
- "authorize_url": auth_url,
- }
- @router.post("/api/admin/oauth/exchange-code")
- async def admin_exchange_code(req: CodeRequest, db: Session = Depends(get_db)):
- """
- 管理后台 SSO 换码接口。
- 与用户端类似,但额外校验 SSO 返回的 roles 中必须包含管理员角色。
- """
- cfg = _get_sso_config()
- if not cfg["client_id"] or not cfg["client_secret"]:
- raise HTTPException(500, "SSO not configured")
- async with httpx.AsyncClient(timeout=15) as client:
- # 换 token
- try:
- resp = await client.post(
- f"{cfg['base_url']}/oauth/token",
- data={
- "grant_type": "authorization_code",
- "code": req.code,
- "redirect_uri": cfg["admin_redirect_uri"],
- "client_id": cfg["client_id"],
- "client_secret": cfg["client_secret"],
- },
- headers={"Content-Type": "application/x-www-form-urlencoded"},
- )
- except httpx.RequestError:
- raise HTTPException(502, "Cannot connect to SSO")
- if resp.status_code != 200:
- try:
- err = resp.json()
- except Exception:
- err = {}
- raise HTTPException(401, err.get("error_description") or "Invalid code")
- access_token = resp.json().get("access_token")
- if not access_token:
- raise HTTPException(502, "No access_token from SSO")
- # 获取用户信息
- try:
- info_resp = await client.get(
- f"{cfg['base_url']}/oauth/userinfo",
- headers={"Authorization": f"Bearer {access_token}"},
- )
- except httpx.RequestError:
- raise HTTPException(502, "Failed to get userinfo")
- if info_resp.status_code != 200:
- raise HTTPException(502, "Failed to get userinfo")
- userinfo = info_resp.json()
- # 校验管理员角色
- roles = userinfo.get("roles", [])
- role_codes = set()
- for r in roles:
- if isinstance(r, dict):
- role_codes.add(r.get("code", ""))
- elif isinstance(r, str):
- role_codes.add(r)
- if not role_codes.intersection(ADMIN_ROLE_CODES):
- raise HTTPException(403, "You do not have admin access to this platform")
- # 查找或创建管理员账号
- from app.models.admin import AdminUser
- username = userinfo.get("username") or userinfo.get("sub") or ""
- if not username:
- raise HTTPException(400, "No username from SSO")
- admin = db.query(AdminUser).filter(AdminUser.username == username).first()
- if not admin:
- # 自动创建管理员账号
- from app.services.admin_auth_service import AdminAuthService
- nickname = userinfo.get("real_name") or username
- admin = AdminUser(
- username=username,
- password_hash=AdminAuthService.hash_password(uuid.uuid4().hex[:16]),
- nickname=nickname,
- status="active",
- )
- db.add(admin)
- db.commit()
- db.refresh(admin)
- logger.info("SSO created admin user: %s (id=%s)", username, admin.id)
- # 签发管理员 JWT
- from app.services.admin_auth_service import AdminAuthService
- token = AdminAuthService.create_token(admin.id, admin.username)
- return {
- "code": 200,
- "message": "success",
- "data": {
- "token": token,
- "admin": {
- "id": admin.id,
- "username": admin.username,
- "nickname": admin.nickname,
- },
- },
- }
|