admin_auth_service.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. """
  2. 管理员认证服务
  3. 提供管理员登录、JWT Token 生成验证、登录失败锁定等功能
  4. Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6
  5. """
  6. import os
  7. import hashlib
  8. from datetime import datetime, timedelta
  9. from typing import Optional
  10. from jose import JWTError, jwt
  11. from passlib.context import CryptContext
  12. from sqlalchemy.orm import Session
  13. from sqlalchemy import func
  14. from app.models.admin import AdminUser, AdminLoginAttempt
  15. from app.schemas.admin_schema import AdminLoginResponse
  16. # 密码哈希上下文
  17. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  18. # JWT 配置(管理员使用独立的密钥)
  19. ADMIN_SECRET_KEY = os.getenv("ADMIN_JWT_SECRET_KEY", "admin-secret-key-change-in-production")
  20. ALGORITHM = "HS256"
  21. ADMIN_TOKEN_EXPIRE_HOURS = 24
  22. # 登录失败锁定配置
  23. MAX_LOGIN_ATTEMPTS = 5
  24. LOCKOUT_MINUTES = 30
  25. class AdminAuthService:
  26. """管理员认证服务类"""
  27. def __init__(self, db: Session):
  28. self.db = db
  29. @staticmethod
  30. def hash_password_for_storage(password: str) -> str:
  31. """为存储准备密码哈希 - 先SHA256再bcrypt"""
  32. sha256_hash = hashlib.sha256(password.encode()).hexdigest()
  33. return pwd_context.hash(sha256_hash)
  34. @staticmethod
  35. def verify_password(password: str, hashed_password: str) -> bool:
  36. """验证密码 - 前端传来的是SHA256哈希"""
  37. return pwd_context.verify(password, hashed_password)
  38. @staticmethod
  39. def create_access_token(admin_id: int, expires_delta: Optional[timedelta] = None) -> tuple[str, datetime]:
  40. """生成 JWT 访问令牌,返回 (token, expires_at)"""
  41. expire = datetime.utcnow() + (expires_delta or timedelta(hours=ADMIN_TOKEN_EXPIRE_HOURS))
  42. to_encode = {"admin_id": admin_id, "exp": expire}
  43. token = jwt.encode(to_encode, ADMIN_SECRET_KEY, algorithm=ALGORITHM)
  44. return token, expire
  45. @staticmethod
  46. def verify_token(token: str) -> dict:
  47. """验证 JWT 令牌并返回 payload"""
  48. return jwt.decode(token, ADMIN_SECRET_KEY, algorithms=[ALGORITHM])
  49. def check_login_attempts(self, username: str) -> bool:
  50. """
  51. 检查登录尝试次数,超过限制返回 False
  52. Requirements: 1.3
  53. """
  54. lockout_time = datetime.utcnow() - timedelta(minutes=LOCKOUT_MINUTES)
  55. failed_attempts = self.db.query(func.count(AdminLoginAttempt.id)).filter(
  56. AdminLoginAttempt.username == username,
  57. AdminLoginAttempt.success == False,
  58. AdminLoginAttempt.created_at >= lockout_time
  59. ).scalar()
  60. return failed_attempts < MAX_LOGIN_ATTEMPTS
  61. def record_login_attempt(self, username: str, success: bool, ip: str) -> None:
  62. """记录登录尝试"""
  63. attempt = AdminLoginAttempt(
  64. username=username,
  65. success=success,
  66. ip_address=ip
  67. )
  68. self.db.add(attempt)
  69. self.db.commit()
  70. def login(self, username: str, password: str, ip: str) -> AdminLoginResponse:
  71. """
  72. 管理员登录
  73. Requirements: 1.1, 1.2, 1.3
  74. """
  75. from app.services.log_service import LogService
  76. log_service = LogService(self.db)
  77. # 检查是否被锁定
  78. if not self.check_login_attempts(username):
  79. self.record_login_attempt(username, False, ip)
  80. log_service.log_login(
  81. user_id=username,
  82. user_type="admin",
  83. login_result="failed",
  84. fail_reason="账号已锁定",
  85. ip_address=ip
  86. )
  87. raise ValueError("ACCOUNT_LOCKED")
  88. # 查找管理员
  89. admin = self.db.query(AdminUser).filter(AdminUser.username == username).first()
  90. if not admin:
  91. self.record_login_attempt(username, False, ip)
  92. log_service.log_login(
  93. user_id=username,
  94. user_type="admin",
  95. login_result="failed",
  96. fail_reason="用户名或密码错误",
  97. ip_address=ip
  98. )
  99. raise ValueError("AUTH_FAILED")
  100. # 检查管理员状态
  101. if admin.status != "active":
  102. self.record_login_attempt(username, False, ip)
  103. log_service.log_login(
  104. user_id=admin.username,
  105. user_type="admin",
  106. login_result="failed",
  107. fail_reason="账号已禁用",
  108. ip_address=ip
  109. )
  110. raise ValueError("ACCOUNT_DISABLED")
  111. # 验证密码
  112. if not self.verify_password(password, admin.password_hash):
  113. self.record_login_attempt(username, False, ip)
  114. log_service.log_login(
  115. user_id=admin.username,
  116. user_type="admin",
  117. login_result="failed",
  118. fail_reason="用户名或密码错误",
  119. ip_address=ip
  120. )
  121. raise ValueError("AUTH_FAILED")
  122. # 登录成功,记录并生成 Token
  123. self.record_login_attempt(username, True, ip)
  124. log_service.log_login(
  125. user_id=admin.username,
  126. user_type="admin",
  127. login_result="success",
  128. ip_address=ip
  129. )
  130. token, expires_at = self.create_access_token(admin.id)
  131. return AdminLoginResponse(
  132. token=token,
  133. admin_id=admin.id,
  134. username=admin.username,
  135. nickname=admin.nickname,
  136. expires_at=expires_at
  137. )
  138. def get_admin_by_id(self, admin_id: int) -> Optional[AdminUser]:
  139. """根据 ID 获取管理员"""
  140. return self.db.query(AdminUser).filter(AdminUser.id == admin_id).first()