""" OAuth 2.0 认证路由 处理 SSO 登录流程、token 刷新和用户信息查询。 所有认证统一走 SSO,不再本地签发 JWT。 """ import logging from fastapi import APIRouter, HTTPException, Query, Request, status from pydantic import BaseModel from typing import Optional from config import settings from services.oauth_service import OAuthService from services.auth_service import AuthService from middleware.auth_middleware import token_cache logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/oauth", tags=["oauth"]) class OAuthLoginResponse(BaseModel): """OAuth 登录响应""" authorization_url: str state: str class SSOTokenResponse(BaseModel): """SSO Token 响应(透传 SSO 中心的 token)""" access_token: str refresh_token: str token_type: str = "bearer" user: dict class RefreshRequest(BaseModel): """Token 刷新请求""" refresh_token: str class UserResponse(BaseModel): """用户信息响应""" id: str username: str email: str role: str created_at: str @router.get("/login", response_model=OAuthLoginResponse) async def oauth_login(): """ 启动 OAuth 登录流程。 生成授权 URL 和 state 参数,前端需要保存 state 并重定向到授权 URL。 """ if not settings.OAUTH_ENABLED: raise HTTPException(status_code=503, detail="SSO 认证未配置") state = OAuthService.generate_state() authorization_url = OAuthService.get_authorization_url(state) return OAuthLoginResponse( authorization_url=authorization_url, state=state ) @router.get("/callback") async def oauth_callback( code: str = Query(..., description="OAuth 授权码"), state: str = Query(..., description="State 参数"), ): """ OAuth 回调端点。 用授权码换取 SSO token,获取用户信息并同步到本地数据库, 直接返回 SSO 的 access_token 和 refresh_token(不再本地签发 JWT)。 """ logger.info(f"OAuth callback received: code={code[:10]}..., state={state[:10]}...") if not settings.OAUTH_ENABLED: raise HTTPException(status_code=503, detail="SSO 认证未配置") try: # 1. 用授权码换取 SSO token logger.debug("Exchanging code for token...") token_data = await OAuthService.exchange_code_for_token(code) access_token = token_data.get("access_token") refresh_token = token_data.get("refresh_token", "") if not access_token: raise HTTPException(status_code=400, detail="未能获取访问令牌") # 2. 使用 SSO token 获取完整用户信息(含角色) logger.debug("Verifying SSO token and getting user info...") try: user_info = await OAuthService.verify_sso_token(access_token) logger.debug(f"User info: {user_info.get('username')}, role: {user_info.get('role')}") except HTTPException as e: logger.error(f"verify_sso_token failed: status={e.status_code}, detail={e.detail}") raise except Exception as e: logger.error(f"verify_sso_token unexpected error: {e}", exc_info=True) raise # 3. 同步用户到本地数据库(含角色映射) logger.debug("Syncing user to local database...") try: user = OAuthService.sync_user_from_oauth(user_info) logger.debug(f"User synced: id={user.id}, username={user.username}, role={user.role}") except Exception as e: logger.error(f"sync_user_from_oauth failed: {e}", exc_info=True) raise # 4. 缓存 token → 用户信息映射 token_cache.set(access_token, user_info) # 5. 直接返回 SSO token(不再本地签发 JWT) logger.info(f"OAuth login successful for user: {user.username}") return SSOTokenResponse( access_token=access_token, refresh_token=refresh_token, token_type="bearer", user={ "id": user.id, "username": user.username, "email": user.email, "role": user.role, "created_at": str(user.created_at) } ) except HTTPException: raise except Exception as e: logger.error(f"OAuth callback error: {e}", exc_info=True) raise HTTPException( status_code=400, detail=f"OAuth 登录失败: {str(e)}" ) @router.post("/refresh") async def oauth_refresh(request_body: RefreshRequest): """ Token 刷新端点。 将 refresh 请求转发到 SSO 中心,返回新的 token。 """ logger.info(f"Token refresh requested, refresh_token={request_body.refresh_token[:20]}...") if not settings.OAUTH_ENABLED: raise HTTPException(status_code=503, detail="SSO 认证未配置") try: token_data = await OAuthService.refresh_sso_token(request_body.refresh_token) logger.info("Token refresh successful") return { "access_token": token_data.get("access_token"), "refresh_token": token_data.get("refresh_token", ""), "token_type": token_data.get("token_type", "bearer"), } except HTTPException as e: logger.error(f"Token refresh failed: status={e.status_code}, detail={e.detail}") raise except Exception as e: logger.error(f"Token refresh unexpected error: {e}", exc_info=True) raise HTTPException( status_code=400, detail=f"Token 刷新失败: {str(e)}" ) @router.get("/me", response_model=UserResponse) async def get_current_user(request: Request): """ 获取当前认证用户信息。 用户信息由 AuthMiddleware 从 SSO 验证后填充到 request.state。 """ user_data = getattr(request.state, "user", None) if not user_data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="未认证" ) # 从本地数据库获取完整用户信息 user = AuthService.get_current_user(user_data["id"]) return UserResponse( id=user.id, username=user.username, email=user.email, role=user.role, created_at=str(user.created_at) ) @router.get("/status") async def oauth_status(): """获取 OAuth 配置状态""" return { "enabled": settings.OAUTH_ENABLED, "provider": "SSO" if settings.OAUTH_ENABLED else None, "base_url": settings.OAUTH_BASE_URL if settings.OAUTH_ENABLED else None }