| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- """
- OAuth 2.0 认证路由
- 处理 SSO 登录流程、本地 JWT 签发、token 刷新和用户信息查询。
- """
- import logging
- import httpx
- from fastapi import APIRouter, HTTPException, Request, status
- from pydantic import BaseModel
- from config import settings
- from services.oauth_service import OAuthService
- from services.auth_service import AuthService
- from services import jwt_service
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/api/oauth", tags=["oauth"])
- class OAuthLoginResponse(BaseModel):
- """OAuth 登录响应"""
- authorization_url: str
- state: str
- class SSOTokenResponse(BaseModel):
- """SSO Token 响应(本地签发的 JWT)"""
- token: str
- refresh_token: str
- token_type: str = "bearer"
- user: dict
- class ExchangeCodeRequest(BaseModel):
- """授权码交换请求"""
- code: str
- class ExchangeCodeResponse(BaseModel):
- """授权码交换响应(本地签发的 JWT)"""
- token: str
- refresh_token: str
- token_type: str = "bearer"
- user: dict
- class RefreshRequest(BaseModel):
- """Token 刷新请求"""
- refresh_token: str
- class UserResponse(BaseModel):
- """用户信息响应"""
- id: str
- username: str
- email: str
- role: str
- created_at: str
- @router.get("/login", response_model=OAuthLoginResponse)
- async def oauth_login():
- """
- 启动 OAuth 登录流程。
- 生成授权 URL 和 state 参数。
- """
- state = OAuthService.generate_state()
- authorization_url = OAuthService.get_authorization_url(state)
- logger.info(f"[OAuth] /api/oauth/login 被调用")
- logger.info(f"[OAuth] state: {state}")
- logger.info(f"[OAuth] authorization_url: {authorization_url}")
- logger.info(f"[OAuth] SSO_BASE_URL: {settings.SSO_BASE_URL}")
- logger.info(f"[OAuth] SSO_REDIRECT_URI: {settings.SSO_REDIRECT_URI}")
- logger.info(f"[OAuth] SSO_CLIENT_ID: {settings.SSO_CLIENT_ID[:10]}...")
- return OAuthLoginResponse(
- authorization_url=authorization_url,
- state=state,
- )
- @router.post("/exchange-code", response_model=ExchangeCodeResponse)
- async def exchange_code(request_body: ExchangeCodeRequest):
- """
- 授权码交换端点(前端调用)。
- 前端从 SSO 回调拿到 code 后,调用此接口换取本地 JWT。
- """
- logger.info("=" * 60)
- logger.info("[OAuth] /api/oauth/exchange-code 请求到达")
- logger.info(f"[OAuth] 请求体: code={request_body.code[:15] if request_body.code else '(空)'}...")
- logger.info(f"[OAuth] code 长度: {len(request_body.code) if request_body.code else 0}")
- logger.info(f"[OAuth] code 完整值: {request_body.code}")
- if not request_body.code:
- logger.error("[OAuth] 授权码为空!前端未传递 code 参数")
- raise HTTPException(status_code=400, detail="授权码为空,请检查前端是否正确传递 code 参数")
- try:
- # 1. 用授权码换取 SSO access_token
- logger.info(f"[OAuth] 开始向 SSO 换取 token, SSO地址: {settings.SSO_BASE_URL}{settings.SSO_TOKEN_ENDPOINT}")
- token_data = await OAuthService.exchange_code_for_token(request_body.code)
- sso_access_token = token_data.get("access_token")
- if not sso_access_token:
- logger.error(f"[OAuth] SSO 返回的 token 响应中没有 access_token, 完整响应: {token_data}")
- raise HTTPException(status_code=400, detail="未能获取访问令牌")
- logger.info(f"[OAuth] SSO token 获取成功, access_token 前20字符: {sso_access_token[:20]}...")
- # 2. 获取完整用户信息(含角色)
- logger.info("[OAuth] 开始获取用户信息")
- user_info = await OAuthService.get_user_profile(sso_access_token)
- logger.info(f"[OAuth] 用户信息: {user_info}")
- # 3. 同步用户到本地数据库
- logger.info(f"[OAuth] 开始同步用户到本地数据库, username={user_info.get('username')}")
- user = OAuthService.sync_user_from_oauth(user_info)
- logger.info(f"[OAuth] 用户同步完成, user.id={user.id}, user.role={user.role}")
- except ValueError as e:
- logger.error(f"[OAuth] 用户角色映射失败: {e}")
- raise HTTPException(status_code=403, detail=str(e))
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"[OAuth] 授权码交换异常: {e}", exc_info=True)
- raise HTTPException(status_code=500, detail=f"登录失败: {str(e)}")
- # 4. 签发本地 JWT
- access_token = jwt_service.create_access_token(
- user_id=user.id,
- username=user.username,
- email=user.email,
- role=user.role,
- )
- refresh_token = jwt_service.create_refresh_token(user_id=user.id)
- logger.info(f"Code exchange successful for user: {user.username}")
- return ExchangeCodeResponse(
- token=access_token,
- refresh_token=refresh_token,
- token_type="bearer",
- user={
- "id": user.id,
- "username": user.username,
- "email": user.email,
- "role": user.role,
- "created_at": str(user.created_at),
- },
- )
- @router.post("/refresh")
- async def oauth_refresh(request_body: RefreshRequest):
- """
- Token 刷新端点。
- 验证 refresh_token(本地 JWT),签发新的 access_token + refresh_token。
- """
- logger.info("Token refresh requested")
- try:
- payload = jwt_service.verify_token(request_body.refresh_token)
- if payload.get("type") != "refresh":
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="无效的 refresh token",
- )
- user_id = payload.get("sub")
- # 从数据库获取用户信息
- user = AuthService.get_current_user(user_id)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="用户不存在",
- )
- new_access_token = jwt_service.create_access_token(
- user_id=user.id,
- username=user.username,
- email=user.email,
- role=user.role,
- )
- new_refresh_token = jwt_service.create_refresh_token(user_id=user.id)
- return {
- "token": new_access_token,
- "refresh_token": new_refresh_token,
- "token_type": "bearer",
- }
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Token refresh unexpected error: {e}", exc_info=True)
- raise HTTPException(
- status_code=400,
- detail=f"Token 刷新失败: {str(e)}",
- )
- @router.post("/logout")
- async def oauth_logout(request: Request):
- """
- 登出端点。
- 可选通知 SSO 注销,返回统一认证平台登录页面 URL 供前端跳转。
- """
- auth_header = request.headers.get("Authorization", "")
- token = auth_header.replace("Bearer ", "") if auth_header.startswith("Bearer ") else None
- if token and settings.SSO_REVOKE_ENDPOINT:
- # 通知 SSO 注销
- try:
- async with httpx.AsyncClient(timeout=5.0) as client:
- await client.post(
- f"{settings.SSO_BASE_URL}{settings.SSO_REVOKE_ENDPOINT}",
- headers={"Authorization": f"Bearer {token}"},
- )
- except Exception as e:
- logger.warning(f"SSO revoke failed (non-critical): {e}")
- return {
- "message": "登出成功",
- "logout_url": settings.SSO_LOGOUT_REDIRECT_URL,
- }
- @router.get("/me", response_model=UserResponse)
- async def get_current_user(request: Request):
- """
- 获取当前认证用户信息。
- 用户信息由 AuthMiddleware 从 JWT 验证后填充到 request.state。
- """
- user_data = getattr(request.state, "user", None)
- if not user_data:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="未认证",
- )
- user = AuthService.get_current_user(user_data["id"])
- return UserResponse(
- id=user.id,
- username=user.username,
- email=user.email,
- role=user.role,
- created_at=str(user.created_at),
- )
- @router.get("/status")
- async def oauth_status():
- """获取 SSO 配置状态"""
- return {
- "enabled": settings.SSO_ENABLED,
- "provider": "SSO" if settings.SSO_ENABLED else None,
- "base_url": settings.SSO_BASE_URL if settings.SSO_ENABLED else None,
- }
|