admin_auth_router.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. """
  2. 管理员认证API路由
  3. 提供管理员登录的API端点
  4. Requirements: 1.1, 1.2, 1.3
  5. """
  6. from fastapi import APIRouter, Depends, HTTPException, status, Request
  7. from sqlalchemy.orm import Session
  8. from app.database import get_db
  9. from app.schemas.admin_schema import AdminLoginRequest, AdminLoginResponse, AdminInfo
  10. from app.services.admin_auth_service import AdminAuthService
  11. from app.dependencies.admin_auth import get_current_admin
  12. from app.models.admin import AdminUser
  13. router = APIRouter(prefix="/api/admin/auth", tags=["管理员认证"])
  14. # 错误码映射
  15. ERROR_MESSAGES = {
  16. "AUTH_FAILED": "用户名或密码错误",
  17. "ACCOUNT_LOCKED": "账户已被锁定,请30分钟后重试",
  18. "ACCOUNT_DISABLED": "账户已被禁用"
  19. }
  20. @router.post("/login", response_model=AdminLoginResponse)
  21. def admin_login(
  22. data: AdminLoginRequest,
  23. request: Request,
  24. db: Session = Depends(get_db)
  25. ):
  26. """管理员登录"""
  27. auth_service = AdminAuthService(db)
  28. # 获取客户端 IP
  29. ip = request.client.host if request.client else "unknown"
  30. try:
  31. response = auth_service.login(data.username, data.password, ip)
  32. return response
  33. except ValueError as e:
  34. error_code = str(e)
  35. message = ERROR_MESSAGES.get(error_code, "认证失败")
  36. if error_code == "ACCOUNT_LOCKED":
  37. raise HTTPException(
  38. status_code=status.HTTP_403_FORBIDDEN,
  39. detail={"code": error_code, "message": message}
  40. )
  41. else:
  42. raise HTTPException(
  43. status_code=status.HTTP_401_UNAUTHORIZED,
  44. detail={"code": error_code, "message": message},
  45. headers={"WWW-Authenticate": "Bearer"}
  46. )
  47. @router.get("/me", response_model=AdminInfo)
  48. def get_current_admin_info(
  49. current_admin: AdminUser = Depends(get_current_admin)
  50. ):
  51. """获取当前管理员信息"""
  52. return AdminInfo.model_validate(current_admin)