""" OAuth 2.0 认证路由 处理 OAuth 登录流程 """ from fastapi import APIRouter, HTTPException, Query from fastapi.responses import RedirectResponse from pydantic import BaseModel from typing import Optional from config import settings from services.oauth_service import OAuthService from services.jwt_service import JWTService from schemas.auth import TokenResponse, UserResponse router = APIRouter(prefix="/api/oauth", tags=["oauth"]) class OAuthLoginResponse(BaseModel): """OAuth 登录响应""" authorization_url: str state: str @router.get("/login", response_model=OAuthLoginResponse) async def oauth_login(): """ 启动 OAuth 登录流程 生成授权 URL 和 state 参数,前端需要保存 state 并重定向到授权 URL Returns: 包含授权 URL 和 state 的响应 """ if not settings.OAUTH_ENABLED: raise HTTPException(status_code=400, detail="OAuth 登录未启用") # 生成 state 参数 state = OAuthService.generate_state() # 构建授权 URL authorization_url = OAuthService.get_authorization_url(state) return OAuthLoginResponse( authorization_url=authorization_url, state=state ) @router.get("/callback", response_model=TokenResponse) async def oauth_callback( code: str = Query(..., description="OAuth 授权码"), state: str = Query(..., description="State 参数"), ): """ OAuth 回调端点 处理 OAuth 认证中心的回调,用授权码换取令牌,获取用户信息, 并创建或更新本地用户记录 Args: code: OAuth 授权码 state: State 参数(前端需要验证) Returns: JWT tokens 和用户信息 """ if not settings.OAUTH_ENABLED: raise HTTPException(status_code=400, detail="OAuth 登录未启用") try: # 1. 用授权码换取访问令牌 token_data = await OAuthService.exchange_code_for_token(code) access_token = token_data.get("access_token") if not access_token: raise HTTPException(status_code=400, detail="未能获取访问令牌") # 2. 使用访问令牌获取用户信息 oauth_user_info = await OAuthService.get_user_info(access_token) # 3. 同步用户到本地数据库 user = OAuthService.sync_user_from_oauth(oauth_user_info) # 4. 生成本地 JWT tokens user_data = { "id": user.id, "username": user.username, "email": user.email, "role": user.role } jwt_access_token = JWTService.create_access_token(user_data) jwt_refresh_token = JWTService.create_refresh_token(user_data) # 5. 返回 tokens 和用户信息 return TokenResponse( access_token=jwt_access_token, refresh_token=jwt_refresh_token, token_type="bearer", user=UserResponse( id=user.id, username=user.username, email=user.email, role=user.role, created_at=user.created_at ) ) except Exception as e: raise HTTPException( status_code=400, detail=f"OAuth 登录失败: {str(e)}" ) @router.get("/status") async def oauth_status(): """ 获取 OAuth 配置状态 Returns: OAuth 是否启用及相关配置信息 """ return { "enabled": settings.OAUTH_ENABLED, "provider": "SSO" if settings.OAUTH_ENABLED else None, "base_url": settings.OAUTH_BASE_URL if settings.OAUTH_ENABLED else None }