""" OAuth2 统一认证平台 SSO 路由 """ import logging import uuid import os from typing import Optional import httpx from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.orm import Session from app.database import get_db from app.models.user import User from app.services.auth_service import AuthService from app.services.user_service import UserService from app.schemas.user_schema import UserCreate from app.services import local_config_service logger = logging.getLogger(__name__) router = APIRouter(tags=["OAuth2 SSO"]) SSO_BASE_URL = os.getenv("SSO_BASE_URL", "http://192.168.92.61:8200") SSO_CLIENT_ID = os.getenv("SSO_CLIENT_ID", "") SSO_CLIENT_SECRET = os.getenv("SSO_CLIENT_SECRET", "") SSO_REDIRECT_URI = os.getenv("SSO_REDIRECT_URI", "http://localhost:3000/#/auth/callback") SSO_SCOPE = os.getenv("SSO_SCOPE", "profile email") SSO_LOGOUT_REDIRECT_URL = os.getenv("SSO_LOGOUT_REDIRECT_URL", "http://192.168.92.61:9200/login") def _get_sso_config() -> dict: try: data = local_config_service.get_all() sso = data.get("sso", {}) except Exception: sso = {} # sso_enable_redirect 控制是否启用SSO重定向 enabled = sso.get("sso_enable_redirect") if enabled is None: enabled = True # 默认启用SSO return { "base_url": (sso.get("sso_base_url") or SSO_BASE_URL).rstrip("/"), "client_id": sso.get("sso_client_id") or SSO_CLIENT_ID, "client_secret": sso.get("sso_client_secret") or SSO_CLIENT_SECRET, "redirect_uri": sso.get("sso_redirect_uri") or SSO_REDIRECT_URI, "scope": sso.get("sso_scope") or SSO_SCOPE, "logout_redirect_url": sso.get("sso_logout_redirect_url") or SSO_LOGOUT_REDIRECT_URL, "enabled": bool(enabled), } class ExchangeCodeRequest(BaseModel): code: str @router.get("/api/sso/config") def get_sso_public_config(): """Return SSO config for frontend.""" cfg = _get_sso_config() authorize_url = ( f"{cfg['base_url']}/oauth/authorize" f"?response_type=code" f"&client_id={cfg['client_id']}" f"&redirect_uri={cfg['redirect_uri']}" f"&scope={cfg['scope']}" ) return { "sso_enabled": cfg["enabled"] and bool(cfg["client_id"]), "authorize_url": authorize_url, "logout_redirect_url": cfg["logout_redirect_url"], } @router.post("/api/oauth/exchange-code") async def exchange_code(request: ExchangeCodeRequest, db: Session = Depends(get_db)): """ OAuth2 authorization code exchange endpoint. Frontend calls this with the code from SSO callback to get a local JWT. """ cfg = _get_sso_config() if not cfg["client_id"] or not cfg["client_secret"]: raise HTTPException(status_code=500, detail="SSO not configured") async with httpx.AsyncClient(timeout=15) as client: # Exchange code for access_token token_url = f"{cfg['base_url']}/oauth/token" token_data = { "grant_type": "authorization_code", "code": request.code, "redirect_uri": cfg["redirect_uri"], "client_id": cfg["client_id"], "client_secret": cfg["client_secret"], } try: token_resp = await client.post( token_url, data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) except httpx.RequestError as e: logger.error("SSO token exchange failed: %s", e) raise HTTPException(status_code=502, detail="Cannot connect to SSO server") if token_resp.status_code != 200: error_data = {} try: error_data = token_resp.json() except Exception: pass error_msg = error_data.get("error_description") or error_data.get("error") or "Invalid code" raise HTTPException(status_code=401, detail=f"SSO login failed: {error_msg}") token_result = token_resp.json() access_token = token_result.get("access_token") if not access_token: raise HTTPException(status_code=502, detail="SSO did not return access_token") # Get user info userinfo_url = f"{cfg['base_url']}/oauth/userinfo" try: userinfo_resp = await client.get( userinfo_url, headers={"Authorization": f"Bearer {access_token}"} ) except httpx.RequestError as e: logger.error("SSO userinfo failed: %s", e) raise HTTPException(status_code=502, detail="Failed to get user info") if userinfo_resp.status_code != 200: raise HTTPException(status_code=502, detail="Failed to get user info") userinfo = userinfo_resp.json() # Sync user to local database username = userinfo.get("username") or userinfo.get("sub") or "" if not username: raise HTTPException(status_code=400, detail="SSO did not return username") user = db.query(User).filter(User.username == username).first() if not user: email = userinfo.get("email") or None nickname = userinfo.get("real_name") or userinfo.get("username") or username random_password = uuid.uuid4().hex[:16] user_service = UserService(db) user_payload = {"username": username, "password": random_password, "nickname": nickname} if email: user_payload["email"] = email user_create = UserCreate(**user_payload) user = user_service.create_user(user_create) logger.info("SSO created new user: %s (id=%s)", username, user.id) else: updated = False if userinfo.get("email") and user.email != userinfo["email"]: user.email = userinfo["email"] updated = True if userinfo.get("real_name") and user.nickname != userinfo["real_name"]: user.nickname = userinfo["real_name"] updated = True if updated: db.commit() # Issue local JWT local_token = AuthService.create_access_token(user.id) return { "code": 200, "message": "success", "data": { "token": local_token, "user": { "id": user.id, "username": user.username, "nickname": user.nickname, "email": user.email, "phone": user.phone, "avatar": user.avatar, } } }