| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- """
- OAuth 2.0 认证路由
- 处理 SSO 登录流程、token 刷新和用户信息查询。
- 所有认证统一走 SSO,不再本地签发 JWT。
- """
- import logging
- from fastapi import APIRouter, HTTPException, Query, Request, status
- from pydantic import BaseModel
- from typing import Optional
- from config import settings
- from services.oauth_service import OAuthService
- from services.auth_service import AuthService
- from middleware.auth_middleware import token_cache
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/api/oauth", tags=["oauth"])
- class OAuthLoginResponse(BaseModel):
- """OAuth 登录响应"""
- authorization_url: str
- state: str
- class SSOTokenResponse(BaseModel):
- """SSO Token 响应(透传 SSO 中心的 token)"""
- access_token: str
- refresh_token: str
- token_type: str = "bearer"
- user: dict
- class RefreshRequest(BaseModel):
- """Token 刷新请求"""
- refresh_token: str
- class UserResponse(BaseModel):
- """用户信息响应"""
- id: str
- username: str
- email: str
- role: str
- created_at: str
- @router.get("/login", response_model=OAuthLoginResponse)
- async def oauth_login():
- """
- 启动 OAuth 登录流程。
- 生成授权 URL 和 state 参数,前端需要保存 state 并重定向到授权 URL。
- """
- if not settings.OAUTH_ENABLED:
- raise HTTPException(status_code=503, detail="SSO 认证未配置")
- state = OAuthService.generate_state()
- authorization_url = OAuthService.get_authorization_url(state)
- return OAuthLoginResponse(
- authorization_url=authorization_url,
- state=state
- )
- @router.get("/callback")
- async def oauth_callback(
- code: str = Query(..., description="OAuth 授权码"),
- state: str = Query(..., description="State 参数"),
- ):
- """
- OAuth 回调端点。
- 用授权码换取 SSO token,获取用户信息并同步到本地数据库,
- 直接返回 SSO 的 access_token 和 refresh_token(不再本地签发 JWT)。
- """
- logger.info(f"OAuth callback received: code={code[:10]}..., state={state[:10]}...")
-
- if not settings.OAUTH_ENABLED:
- raise HTTPException(status_code=503, detail="SSO 认证未配置")
- try:
- # 1. 用授权码换取 SSO token
- logger.debug("Exchanging code for token...")
- token_data = await OAuthService.exchange_code_for_token(code)
- access_token = token_data.get("access_token")
- refresh_token = token_data.get("refresh_token", "")
- if not access_token:
- raise HTTPException(status_code=400, detail="未能获取访问令牌")
- # 2. 使用 SSO token 获取完整用户信息(含角色)
- logger.debug("Verifying SSO token and getting user info...")
- try:
- user_info = await OAuthService.verify_sso_token(access_token)
- logger.debug(f"User info: {user_info.get('username')}, role: {user_info.get('role')}")
- except HTTPException as e:
- logger.error(f"verify_sso_token failed: status={e.status_code}, detail={e.detail}")
- raise
- except Exception as e:
- logger.error(f"verify_sso_token unexpected error: {e}", exc_info=True)
- raise
- # 3. 同步用户到本地数据库(含角色映射)
- logger.debug("Syncing user to local database...")
- try:
- user = OAuthService.sync_user_from_oauth(user_info)
- logger.debug(f"User synced: id={user.id}, username={user.username}, role={user.role}")
- except Exception as e:
- logger.error(f"sync_user_from_oauth failed: {e}", exc_info=True)
- raise
- # 4. 缓存 token → 用户信息映射
- token_cache.set(access_token, user_info)
- # 5. 直接返回 SSO token(不再本地签发 JWT)
- logger.info(f"OAuth login successful for user: {user.username}")
- return SSOTokenResponse(
- access_token=access_token,
- refresh_token=refresh_token,
- token_type="bearer",
- user={
- "id": user.id,
- "username": user.username,
- "email": user.email,
- "role": user.role,
- "created_at": str(user.created_at)
- }
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"OAuth callback error: {e}", exc_info=True)
- raise HTTPException(
- status_code=400,
- detail=f"OAuth 登录失败: {str(e)}"
- )
- @router.post("/refresh")
- async def oauth_refresh(request_body: RefreshRequest):
- """
- Token 刷新端点。
- 将 refresh 请求转发到 SSO 中心,返回新的 token。
- """
- logger.info(f"Token refresh requested, refresh_token={request_body.refresh_token[:20]}...")
-
- if not settings.OAUTH_ENABLED:
- raise HTTPException(status_code=503, detail="SSO 认证未配置")
- try:
- token_data = await OAuthService.refresh_sso_token(request_body.refresh_token)
- logger.info("Token refresh successful")
-
- return {
- "access_token": token_data.get("access_token"),
- "refresh_token": token_data.get("refresh_token", ""),
- "token_type": token_data.get("token_type", "bearer"),
- }
- except HTTPException as e:
- logger.error(f"Token refresh failed: status={e.status_code}, detail={e.detail}")
- raise
- except Exception as e:
- logger.error(f"Token refresh unexpected error: {e}", exc_info=True)
- raise HTTPException(
- status_code=400,
- detail=f"Token 刷新失败: {str(e)}"
- )
- @router.get("/me", response_model=UserResponse)
- async def get_current_user(request: Request):
- """
- 获取当前认证用户信息。
- 用户信息由 AuthMiddleware 从 SSO 验证后填充到 request.state。
- """
- user_data = getattr(request.state, "user", None)
- if not user_data:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="未认证"
- )
- # 从本地数据库获取完整用户信息
- user = AuthService.get_current_user(user_data["id"])
- return UserResponse(
- id=user.id,
- username=user.username,
- email=user.email,
- role=user.role,
- created_at=str(user.created_at)
- )
- @router.get("/status")
- async def oauth_status():
- """获取 OAuth 配置状态"""
- return {
- "enabled": settings.OAUTH_ENABLED,
- "provider": "SSO" if settings.OAUTH_ENABLED else None,
- "base_url": settings.OAUTH_BASE_URL if settings.OAUTH_ENABLED else None
- }
|