"""OAuth2 SSO router for LQAI unified auth platform.""" import logging import uuid import os import httpx from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import HTMLResponse from pydantic import BaseModel from sqlalchemy.orm import Session from app.database import get_db from app.models.user import User from app.services.auth_service import AuthService from app.services.user_service import UserService from app.schemas.user_schema import UserCreate from app.services import local_config_service logger = logging.getLogger(__name__) router = APIRouter(tags=["OAuth2 SSO"]) SSO_BASE_URL = os.getenv("SSO_BASE_URL", "http://192.168.92.61:8200") SSO_CLIENT_ID = os.getenv("SSO_CLIENT_ID", "") SSO_CLIENT_SECRET = os.getenv("SSO_CLIENT_SECRET", "") SSO_REDIRECT_URI = os.getenv("SSO_REDIRECT_URI", "http://localhost:3000/#/auth/callback") SSO_ADMIN_REDIRECT_URI = os.getenv("SSO_ADMIN_REDIRECT_URI", "http://localhost:3001/#/sso-callback") SSO_SCOPE = os.getenv("SSO_SCOPE", "profile email") SSO_LOGOUT_URL = os.getenv("SSO_LOGOUT_REDIRECT_URL", "http://192.168.92.61:9200/login") # SSO 角色 code 中包含以下任一值即视为管理员 ADMIN_ROLE_CODES = {"super_admin", "maas_admin", "admin"} def _get_sso_config(): try: data = local_config_service.get_all() sso = data.get("sso", {}) except Exception: sso = {} # sso_enable_redirect 控制是否启用SSO重定向 enabled = sso.get("sso_enable_redirect") if enabled is None: enabled = True # 默认启用SSO return { "base_url": (sso.get("sso_base_url") or SSO_BASE_URL).rstrip("/"), "client_id": sso.get("sso_client_id") or SSO_CLIENT_ID, "client_secret": sso.get("sso_client_secret") or SSO_CLIENT_SECRET, "redirect_uri": sso.get("sso_redirect_uri") or SSO_REDIRECT_URI, "admin_redirect_uri": sso.get("sso_admin_redirect_uri") or SSO_ADMIN_REDIRECT_URI, "scope": sso.get("sso_scope") or SSO_SCOPE, "logout_url": sso.get("sso_logout_redirect_url") or SSO_LOGOUT_URL, "enabled": bool(enabled), } class CodeRequest(BaseModel): code: str @router.get("/api/sso/config") def get_sso_public_config(): cfg = _get_sso_config() auth_url = ( f"{cfg['base_url']}/oauth/authorize" f"?response_type=code&client_id={cfg['client_id']}" f"&redirect_uri={cfg['redirect_uri']}&scope={cfg['scope']}" f"&state=/" ) return { "sso_enabled": cfg["enabled"] and bool(cfg["client_id"]), "authorize_url": auth_url, "logout_redirect_url": cfg["logout_url"], } @router.post("/api/oauth/exchange-code") async def exchange_code(req: CodeRequest, db: Session = Depends(get_db)): cfg = _get_sso_config() if not cfg["client_id"] or not cfg["client_secret"]: raise HTTPException(500, "SSO not configured") async with httpx.AsyncClient(timeout=15) as client: try: resp = await client.post( f"{cfg['base_url']}/oauth/token", data={ "grant_type": "authorization_code", "code": req.code, "redirect_uri": cfg["redirect_uri"], "client_id": cfg["client_id"], "client_secret": cfg["client_secret"], }, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) except httpx.RequestError: raise HTTPException(502, "Cannot connect to SSO") if resp.status_code != 200: try: err = resp.json() except Exception: err = {} raise HTTPException(401, err.get("error_description") or "Invalid code") access_token = resp.json().get("access_token") if not access_token: raise HTTPException(502, "No access_token from SSO") try: info_resp = await client.get( f"{cfg['base_url']}/oauth/userinfo", headers={"Authorization": f"Bearer {access_token}"}, ) except httpx.RequestError: raise HTTPException(502, "Failed to get userinfo") if info_resp.status_code != 200: raise HTTPException(502, "Failed to get userinfo") userinfo = info_resp.json() username = userinfo.get("username") or userinfo.get("sub") or "" if not username: raise HTTPException(400, "No username from SSO") user = db.query(User).filter(User.username == username).first() if not user: nickname = userinfo.get("real_name") or username svc = UserService(db) payload = {"username": username, "password": uuid.uuid4().hex[:16], "nickname": nickname} if userinfo.get("email"): payload["email"] = userinfo["email"] user = svc.create_user(UserCreate(**payload)) logger.info("SSO new user: %s", username) else: changed = False if userinfo.get("email") and user.email != userinfo["email"]: user.email = userinfo["email"] changed = True if userinfo.get("real_name") and user.nickname != userinfo["real_name"]: user.nickname = userinfo["real_name"] changed = True if changed: db.commit() token = AuthService.create_access_token(user.id) return { "code": 200, "message": "success", "data": { "token": token, "user": { "id": user.id, "username": user.username, "nickname": user.nickname, "email": user.email, "phone": user.phone, "avatar": user.avatar, }, }, } # ============ OAuth 回调(处理 SSO 服务器的 code 回传) ============ async def _exchange_code_and_build_html(code: str, redirect_path: str, is_admin: bool = False) -> str: """用 code 换 local JWT,返回自动登录并跳转的 HTML 页面。""" from app.database import SessionLocal from app.models.user import User from app.models.admin import AdminUser from app.services.user_service import UserService from app.services.auth_service import AuthService from app.services.admin_auth_service import AdminAuthService from app.schemas.user_schema import UserCreate cfg = _get_sso_config() async with httpx.AsyncClient(timeout=15) as client: try: resp = await client.post( f"{cfg['base_url']}/oauth/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": cfg["admin_redirect_uri"] if is_admin else cfg["redirect_uri"], "client_id": cfg["client_id"], "client_secret": cfg["client_secret"], }, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) except httpx.RequestError: return "" if resp.status_code != 200: try: err = resp.json() except Exception: err = {} msg = err.get("error_description") or "SSO login failed" import html as _html return f"" access_token = resp.json().get("access_token") if not access_token: return "" try: info_resp = await client.get( f"{cfg['base_url']}/oauth/userinfo", headers={"Authorization": f"Bearer {access_token}"}, ) except httpx.RequestError: return "" if info_resp.status_code != 200: return "" userinfo = info_resp.json() username = userinfo.get("username") or userinfo.get("sub") or "" if not username: return "" db = SessionLocal() try: if is_admin: admin = db.query(AdminUser).filter(AdminUser.username == username).first() if not admin: admin = AdminUser( username=username, password_hash=AdminAuthService.hash_password(uuid.uuid4().hex[:16]), nickname=userinfo.get("real_name") or username, status="active", ) db.add(admin) db.commit() db.refresh(admin) local_token = AdminAuthService.create_token(admin.id, admin.username) else: user = db.query(User).filter(User.username == username).first() if not user: payload = {"username": username, "password": uuid.uuid4().hex[:16], "nickname": userinfo.get("real_name") or username} if userinfo.get("email"): payload["email"] = userinfo["email"] user = UserService(db).create_user(UserCreate(**payload)) elif userinfo.get("email") and user.email != userinfo["email"]: user.email = userinfo["email"] db.commit() local_token = AuthService.create_access_token(user.id) finally: db.close() import html as _html safe_redirect = _html.escape(redirect_path or "/") return ( "登录中..." "" "

正在登录,请稍候...

" "" ) @router.get("/api/oauth/callback") async def oauth_user_callback(code: str = Query(...), state: str = Query(default="/")): """用户端 OAuth 回调:SSO 服务器 redirect 到此地址,后端换 token 后返回自动登录 HTML。""" html_content = await _exchange_code_and_build_html(code, state, is_admin=False) return HTMLResponse(content=html_content) @router.get("/api/admin/oauth/callback") async def oauth_admin_callback(code: str = Query(...), state: str = Query(default="/admin/")): """管理后台 OAuth 回调:同上,但签发管理员 token。""" html_content = await _exchange_code_and_build_html(code, state, is_admin=True) return HTMLResponse(content=html_content) # ============ 管理员 SSO 接口 ============ @router.get("/api/admin/sso/config") def get_admin_sso_config(): """管理后台 SSO 配置,返回管理员专用的授权 URL。""" cfg = _get_sso_config() auth_url = ( f"{cfg['base_url']}/oauth/authorize" f"?response_type=code&client_id={cfg['client_id']}" f"&redirect_uri={cfg['admin_redirect_uri']}&scope={cfg['scope']}" f"&state=/admin/" ) return { "sso_enabled": cfg["enabled"] and bool(cfg["client_id"]), "authorize_url": auth_url, } @router.post("/api/admin/oauth/exchange-code") async def admin_exchange_code(req: CodeRequest, db: Session = Depends(get_db)): """ 管理后台 SSO 换码接口。 与用户端类似,但额外校验 SSO 返回的 roles 中必须包含管理员角色。 """ cfg = _get_sso_config() if not cfg["client_id"] or not cfg["client_secret"]: raise HTTPException(500, "SSO not configured") async with httpx.AsyncClient(timeout=15) as client: # 换 token try: resp = await client.post( f"{cfg['base_url']}/oauth/token", data={ "grant_type": "authorization_code", "code": req.code, "redirect_uri": cfg["admin_redirect_uri"], "client_id": cfg["client_id"], "client_secret": cfg["client_secret"], }, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) except httpx.RequestError: raise HTTPException(502, "Cannot connect to SSO") if resp.status_code != 200: try: err = resp.json() except Exception: err = {} raise HTTPException(401, err.get("error_description") or "Invalid code") access_token = resp.json().get("access_token") if not access_token: raise HTTPException(502, "No access_token from SSO") # 获取用户信息 try: info_resp = await client.get( f"{cfg['base_url']}/oauth/userinfo", headers={"Authorization": f"Bearer {access_token}"}, ) except httpx.RequestError: raise HTTPException(502, "Failed to get userinfo") if info_resp.status_code != 200: raise HTTPException(502, "Failed to get userinfo") userinfo = info_resp.json() # 校验管理员角色 roles = userinfo.get("roles", []) role_codes = set() for r in roles: if isinstance(r, dict): role_codes.add(r.get("code", "")) elif isinstance(r, str): role_codes.add(r) if not role_codes.intersection(ADMIN_ROLE_CODES): raise HTTPException(403, "You do not have admin access to this platform") # 查找或创建管理员账号 from app.models.admin import AdminUser username = userinfo.get("username") or userinfo.get("sub") or "" if not username: raise HTTPException(400, "No username from SSO") admin = db.query(AdminUser).filter(AdminUser.username == username).first() if not admin: # 自动创建管理员账号 from app.services.admin_auth_service import AdminAuthService nickname = userinfo.get("real_name") or username admin = AdminUser( username=username, password_hash=AdminAuthService.hash_password(uuid.uuid4().hex[:16]), nickname=nickname, status="active", ) db.add(admin) db.commit() db.refresh(admin) logger.info("SSO created admin user: %s (id=%s)", username, admin.id) # 签发管理员 JWT from app.services.admin_auth_service import AdminAuthService token = AdminAuthService.create_token(admin.id, admin.username) return { "code": 200, "message": "success", "data": { "token": token, "admin": { "id": admin.id, "username": admin.username, "nickname": admin.nickname, }, }, }