| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- """
- 管理员认证依赖
- 提供管理员 JWT 令牌验证和管理员获取的依赖注入
- Requirements: 1.4, 1.5
- """
- from fastapi import Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer
- from jose import JWTError
- from sqlalchemy.orm import Session
- from app.database import get_db
- from app.models.admin import AdminUser
- from app.services.admin_auth_service import AdminAuthService
- # 管理员认证使用独立的 tokenUrl
- admin_oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/auth/login")
- def get_current_admin(
- token: str = Depends(admin_oauth2_scheme),
- db: Session = Depends(get_db)
- ) -> AdminUser:
- """从 JWT 令牌获取当前管理员"""
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={"code": "TOKEN_INVALID", "message": "Token无效或已过期"},
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = AdminAuthService.verify_token(token)
- admin_id = payload.get("admin_id")
- if not admin_id:
- raise credentials_exception
- except JWTError:
- raise credentials_exception
- auth_service = AdminAuthService(db)
- admin = auth_service.get_admin_by_id(admin_id)
- if not admin:
- raise credentials_exception
-
- # 检查管理员状态
- if admin.status != "active":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail={"code": "ACCOUNT_DISABLED", "message": "账户已被禁用"}
- )
-
- return admin
|