| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- """
- OAuth 2.0 认证路由
- 处理 OAuth 登录流程
- """
- from fastapi import APIRouter, HTTPException, Query
- from fastapi.responses import RedirectResponse
- from pydantic import BaseModel
- from typing import Optional
- from config import settings
- from services.oauth_service import OAuthService
- from services.jwt_service import JWTService
- from schemas.auth import TokenResponse, UserResponse
- router = APIRouter(prefix="/api/oauth", tags=["oauth"])
- class OAuthLoginResponse(BaseModel):
- """OAuth 登录响应"""
- authorization_url: str
- state: str
- @router.get("/login", response_model=OAuthLoginResponse)
- async def oauth_login():
- """
- 启动 OAuth 登录流程
-
- 生成授权 URL 和 state 参数,前端需要保存 state 并重定向到授权 URL
-
- Returns:
- 包含授权 URL 和 state 的响应
- """
- if not settings.OAUTH_ENABLED:
- raise HTTPException(status_code=400, detail="OAuth 登录未启用")
-
- # 生成 state 参数
- state = OAuthService.generate_state()
-
- # 构建授权 URL
- authorization_url = OAuthService.get_authorization_url(state)
-
- return OAuthLoginResponse(
- authorization_url=authorization_url,
- state=state
- )
- @router.get("/callback", response_model=TokenResponse)
- async def oauth_callback(
- code: str = Query(..., description="OAuth 授权码"),
- state: str = Query(..., description="State 参数"),
- ):
- """
- OAuth 回调端点
-
- 处理 OAuth 认证中心的回调,用授权码换取令牌,获取用户信息,
- 并创建或更新本地用户记录
-
- Args:
- code: OAuth 授权码
- state: State 参数(前端需要验证)
-
- Returns:
- JWT tokens 和用户信息
- """
- if not settings.OAUTH_ENABLED:
- raise HTTPException(status_code=400, detail="OAuth 登录未启用")
-
- try:
- # 1. 用授权码换取访问令牌
- token_data = await OAuthService.exchange_code_for_token(code)
- access_token = token_data.get("access_token")
-
- if not access_token:
- raise HTTPException(status_code=400, detail="未能获取访问令牌")
-
- # 2. 使用访问令牌获取用户信息
- oauth_user_info = await OAuthService.get_user_info(access_token)
-
- # 3. 同步用户到本地数据库
- user = OAuthService.sync_user_from_oauth(oauth_user_info)
-
- # 4. 生成本地 JWT tokens
- user_data = {
- "id": user.id,
- "username": user.username,
- "email": user.email,
- "role": user.role
- }
-
- jwt_access_token = JWTService.create_access_token(user_data)
- jwt_refresh_token = JWTService.create_refresh_token(user_data)
-
- # 5. 返回 tokens 和用户信息
- return TokenResponse(
- access_token=jwt_access_token,
- refresh_token=jwt_refresh_token,
- token_type="bearer",
- user=UserResponse(
- id=user.id,
- username=user.username,
- email=user.email,
- role=user.role,
- created_at=user.created_at
- )
- )
-
- except Exception as e:
- raise HTTPException(
- status_code=400,
- detail=f"OAuth 登录失败: {str(e)}"
- )
- @router.get("/status")
- async def oauth_status():
- """
- 获取 OAuth 配置状态
-
- Returns:
- OAuth 是否启用及相关配置信息
- """
- return {
- "enabled": settings.OAUTH_ENABLED,
- "provider": "SSO" if settings.OAUTH_ENABLED else None,
- "base_url": settings.OAUTH_BASE_URL if settings.OAUTH_ENABLED else None
- }
|