""" 管理员认证服务 提供管理员登录、JWT Token 生成验证、登录失败锁定等功能 Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6 """ import os import hashlib from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from sqlalchemy import func from app.models.admin import AdminUser, AdminLoginAttempt from app.schemas.admin_schema import AdminLoginResponse # 密码哈希上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT 配置(管理员使用独立的密钥) ADMIN_SECRET_KEY = os.getenv("ADMIN_JWT_SECRET_KEY", "admin-secret-key-change-in-production") ALGORITHM = "HS256" ADMIN_TOKEN_EXPIRE_HOURS = 24 # 登录失败锁定配置 MAX_LOGIN_ATTEMPTS = 5 LOCKOUT_MINUTES = 30 class AdminAuthService: """管理员认证服务类""" def __init__(self, db: Session): self.db = db @staticmethod def hash_password_for_storage(password: str) -> str: """为存储准备密码哈希 - 先SHA256再bcrypt""" sha256_hash = hashlib.sha256(password.encode()).hexdigest() return pwd_context.hash(sha256_hash) @staticmethod def verify_password(password: str, hashed_password: str) -> bool: """验证密码 - 前端传来的是SHA256哈希""" return pwd_context.verify(password, hashed_password) @staticmethod def create_access_token(admin_id: int, expires_delta: Optional[timedelta] = None) -> tuple[str, datetime]: """生成 JWT 访问令牌,返回 (token, expires_at)""" expire = datetime.utcnow() + (expires_delta or timedelta(hours=ADMIN_TOKEN_EXPIRE_HOURS)) to_encode = {"admin_id": admin_id, "exp": expire} token = jwt.encode(to_encode, ADMIN_SECRET_KEY, algorithm=ALGORITHM) return token, expire @staticmethod def verify_token(token: str) -> dict: """验证 JWT 令牌并返回 payload""" return jwt.decode(token, ADMIN_SECRET_KEY, algorithms=[ALGORITHM]) def check_login_attempts(self, username: str) -> bool: """ 检查登录尝试次数,超过限制返回 False Requirements: 1.3 """ lockout_time = datetime.utcnow() - timedelta(minutes=LOCKOUT_MINUTES) failed_attempts = self.db.query(func.count(AdminLoginAttempt.id)).filter( AdminLoginAttempt.username == username, AdminLoginAttempt.success == False, AdminLoginAttempt.created_at >= lockout_time ).scalar() return failed_attempts < MAX_LOGIN_ATTEMPTS def record_login_attempt(self, username: str, success: bool, ip: str) -> None: """记录登录尝试""" attempt = AdminLoginAttempt( username=username, success=success, ip_address=ip ) self.db.add(attempt) self.db.commit() def login(self, username: str, password: str, ip: str) -> AdminLoginResponse: """ 管理员登录 Requirements: 1.1, 1.2, 1.3 """ from app.services.log_service import LogService log_service = LogService(self.db) # 检查是否被锁定 if not self.check_login_attempts(username): self.record_login_attempt(username, False, ip) log_service.log_login( user_id=username, user_type="admin", login_result="failed", fail_reason="账号已锁定", ip_address=ip ) raise ValueError("ACCOUNT_LOCKED") # 查找管理员 admin = self.db.query(AdminUser).filter(AdminUser.username == username).first() if not admin: self.record_login_attempt(username, False, ip) log_service.log_login( user_id=username, user_type="admin", login_result="failed", fail_reason="用户名或密码错误", ip_address=ip ) raise ValueError("AUTH_FAILED") # 检查管理员状态 if admin.status != "active": self.record_login_attempt(username, False, ip) log_service.log_login( user_id=admin.username, user_type="admin", login_result="failed", fail_reason="账号已禁用", ip_address=ip ) raise ValueError("ACCOUNT_DISABLED") # 验证密码 if not self.verify_password(password, admin.password_hash): self.record_login_attempt(username, False, ip) log_service.log_login( user_id=admin.username, user_type="admin", login_result="failed", fail_reason="用户名或密码错误", ip_address=ip ) raise ValueError("AUTH_FAILED") # 登录成功,记录并生成 Token self.record_login_attempt(username, True, ip) log_service.log_login( user_id=admin.username, user_type="admin", login_result="success", ip_address=ip ) token, expires_at = self.create_access_token(admin.id) return AdminLoginResponse( token=token, admin_id=admin.id, username=admin.username, nickname=admin.nickname, expires_at=expires_at ) def get_admin_by_id(self, admin_id: int) -> Optional[AdminUser]: """根据 ID 获取管理员""" return self.db.query(AdminUser).filter(AdminUser.id == admin_id).first()