""" OAuth 2.0 认证路由 处理 SSO 登录流程、本地 JWT 签发、token 刷新和用户信息查询。 """ import logging import httpx from fastapi import APIRouter, HTTPException, Request, status from pydantic import BaseModel from config import settings from services.oauth_service import OAuthService from services.auth_service import AuthService from services import jwt_service logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/oauth", tags=["oauth"]) class OAuthLoginResponse(BaseModel): """OAuth 登录响应""" authorization_url: str state: str class SSOTokenResponse(BaseModel): """SSO Token 响应(本地签发的 JWT)""" token: str refresh_token: str token_type: str = "bearer" user: dict class ExchangeCodeRequest(BaseModel): """授权码交换请求""" code: str class ExchangeCodeResponse(BaseModel): """授权码交换响应(本地签发的 JWT)""" token: str refresh_token: str token_type: str = "bearer" user: dict class RefreshRequest(BaseModel): """Token 刷新请求""" refresh_token: str class UserResponse(BaseModel): """用户信息响应""" id: str username: str email: str role: str created_at: str @router.get("/login", response_model=OAuthLoginResponse) async def oauth_login(): """ 启动 OAuth 登录流程。 生成授权 URL 和 state 参数。 """ state = OAuthService.generate_state() authorization_url = OAuthService.get_authorization_url(state) return OAuthLoginResponse( authorization_url=authorization_url, state=state, ) @router.post("/exchange-code", response_model=ExchangeCodeResponse) async def exchange_code(request_body: ExchangeCodeRequest): """ 授权码交换端点(前端调用)。 前端从 SSO 回调拿到 code 后,调用此接口换取本地 JWT。 """ logger.info(f"Exchange code received: code={request_body.code[:10]}...") # 1. 用授权码换取 SSO access_token token_data = await OAuthService.exchange_code_for_token(request_body.code) sso_access_token = token_data.get("access_token") if not sso_access_token: raise HTTPException(status_code=400, detail="未能获取访问令牌") # 2. 获取完整用户信息(含角色) user_info = await OAuthService.get_user_profile(sso_access_token) # 3. 同步用户到本地数据库 user = OAuthService.sync_user_from_oauth(user_info) # 4. 签发本地 JWT access_token = jwt_service.create_access_token( user_id=user.id, username=user.username, email=user.email, role=user.role, ) refresh_token = jwt_service.create_refresh_token(user_id=user.id) logger.info(f"Code exchange successful for user: {user.username}") return ExchangeCodeResponse( token=access_token, refresh_token=refresh_token, token_type="bearer", user={ "id": user.id, "username": user.username, "email": user.email, "role": user.role, "created_at": str(user.created_at), }, ) @router.post("/refresh") async def oauth_refresh(request_body: RefreshRequest): """ Token 刷新端点。 验证 refresh_token(本地 JWT),签发新的 access_token + refresh_token。 """ logger.info("Token refresh requested") try: payload = jwt_service.verify_token(request_body.refresh_token) if payload.get("type") != "refresh": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的 refresh token", ) user_id = payload.get("sub") # 从数据库获取用户信息 user = AuthService.get_current_user(user_id) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在", ) new_access_token = jwt_service.create_access_token( user_id=user.id, username=user.username, email=user.email, role=user.role, ) new_refresh_token = jwt_service.create_refresh_token(user_id=user.id) return { "token": new_access_token, "refresh_token": new_refresh_token, "token_type": "bearer", } except HTTPException: raise except Exception as e: logger.error(f"Token refresh unexpected error: {e}", exc_info=True) raise HTTPException( status_code=400, detail=f"Token 刷新失败: {str(e)}", ) @router.post("/logout") async def oauth_logout(request: Request): """ 登出端点。 可选通知 SSO 注销,返回统一认证平台登录页面 URL 供前端跳转。 """ auth_header = request.headers.get("Authorization", "") token = auth_header.replace("Bearer ", "") if auth_header.startswith("Bearer ") else None if token and settings.SSO_REVOKE_ENDPOINT: # 通知 SSO 注销 try: async with httpx.AsyncClient(timeout=5.0) as client: await client.post( f"{settings.SSO_BASE_URL}{settings.SSO_REVOKE_ENDPOINT}", headers={"Authorization": f"Bearer {token}"}, ) except Exception as e: logger.warning(f"SSO revoke failed (non-critical): {e}") return { "message": "登出成功", "logout_url": settings.SSO_LOGOUT_REDIRECT_URL, } @router.get("/me", response_model=UserResponse) async def get_current_user(request: Request): """ 获取当前认证用户信息。 用户信息由 AuthMiddleware 从 JWT 验证后填充到 request.state。 """ user_data = getattr(request.state, "user", None) if not user_data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="未认证", ) user = AuthService.get_current_user(user_data["id"]) return UserResponse( id=user.id, username=user.username, email=user.email, role=user.role, created_at=str(user.created_at), ) @router.get("/status") async def oauth_status(): """获取 SSO 配置状态""" return { "enabled": settings.SSO_ENABLED, "provider": "SSO" if settings.SSO_ENABLED else None, "base_url": settings.SSO_BASE_URL if settings.SSO_ENABLED else None, }