admin_auth.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. """
  2. 管理员认证依赖
  3. 提供管理员 JWT 令牌验证和管理员获取的依赖注入
  4. Requirements: 1.4, 1.5
  5. """
  6. from fastapi import Depends, HTTPException, status
  7. from fastapi.security import OAuth2PasswordBearer
  8. from jose import JWTError
  9. from sqlalchemy.orm import Session
  10. from app.database import get_db
  11. from app.models.admin import AdminUser
  12. from app.services.admin_auth_service import AdminAuthService
  13. # 管理员认证使用独立的 tokenUrl
  14. admin_oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/auth/login")
  15. def get_current_admin(
  16. token: str = Depends(admin_oauth2_scheme),
  17. db: Session = Depends(get_db)
  18. ) -> AdminUser:
  19. """从 JWT 令牌获取当前管理员"""
  20. credentials_exception = HTTPException(
  21. status_code=status.HTTP_401_UNAUTHORIZED,
  22. detail={"code": "TOKEN_INVALID", "message": "Token无效或已过期"},
  23. headers={"WWW-Authenticate": "Bearer"},
  24. )
  25. try:
  26. payload = AdminAuthService.verify_token(token)
  27. admin_id = payload.get("admin_id")
  28. if not admin_id:
  29. raise credentials_exception
  30. except JWTError:
  31. raise credentials_exception
  32. auth_service = AdminAuthService(db)
  33. admin = auth_service.get_admin_by_id(admin_id)
  34. if not admin:
  35. raise credentials_exception
  36. # 检查管理员状态
  37. if admin.status != "active":
  38. raise HTTPException(
  39. status_code=status.HTTP_403_FORBIDDEN,
  40. detail={"code": "ACCOUNT_DISABLED", "message": "账户已被禁用"}
  41. )
  42. return admin