| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- """
- 管理员认证服务
- 提供管理员登录、JWT Token 生成验证、登录失败锁定等功能
- Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6
- """
- import os
- import hashlib
- from datetime import datetime, timedelta
- from typing import Optional
- from jose import JWTError, jwt
- from passlib.context import CryptContext
- from sqlalchemy.orm import Session
- from sqlalchemy import func
- from app.models.admin import AdminUser, AdminLoginAttempt
- from app.schemas.admin_schema import AdminLoginResponse
- # 密码哈希上下文
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- # JWT 配置(管理员使用独立的密钥)
- ADMIN_SECRET_KEY = os.getenv("ADMIN_JWT_SECRET_KEY", "admin-secret-key-change-in-production")
- ALGORITHM = "HS256"
- ADMIN_TOKEN_EXPIRE_HOURS = 24
- # 登录失败锁定配置
- MAX_LOGIN_ATTEMPTS = 5
- LOCKOUT_MINUTES = 30
- class AdminAuthService:
- """管理员认证服务类"""
- def __init__(self, db: Session):
- self.db = db
- @staticmethod
- def hash_password_for_storage(password: str) -> str:
- """为存储准备密码哈希 - 先SHA256再bcrypt"""
- sha256_hash = hashlib.sha256(password.encode()).hexdigest()
- return pwd_context.hash(sha256_hash)
- @staticmethod
- def verify_password(password: str, hashed_password: str) -> bool:
- """验证密码 - 前端传来的是SHA256哈希"""
- return pwd_context.verify(password, hashed_password)
- @staticmethod
- def create_access_token(admin_id: int, expires_delta: Optional[timedelta] = None) -> tuple[str, datetime]:
- """生成 JWT 访问令牌,返回 (token, expires_at)"""
- expire = datetime.utcnow() + (expires_delta or timedelta(hours=ADMIN_TOKEN_EXPIRE_HOURS))
- to_encode = {"admin_id": admin_id, "exp": expire}
- token = jwt.encode(to_encode, ADMIN_SECRET_KEY, algorithm=ALGORITHM)
- return token, expire
- @staticmethod
- def verify_token(token: str) -> dict:
- """验证 JWT 令牌并返回 payload"""
- return jwt.decode(token, ADMIN_SECRET_KEY, algorithms=[ALGORITHM])
- def check_login_attempts(self, username: str) -> bool:
- """
- 检查登录尝试次数,超过限制返回 False
- Requirements: 1.3
- """
- lockout_time = datetime.utcnow() - timedelta(minutes=LOCKOUT_MINUTES)
- failed_attempts = self.db.query(func.count(AdminLoginAttempt.id)).filter(
- AdminLoginAttempt.username == username,
- AdminLoginAttempt.success == False,
- AdminLoginAttempt.created_at >= lockout_time
- ).scalar()
- return failed_attempts < MAX_LOGIN_ATTEMPTS
- def record_login_attempt(self, username: str, success: bool, ip: str) -> None:
- """记录登录尝试"""
- attempt = AdminLoginAttempt(
- username=username,
- success=success,
- ip_address=ip
- )
- self.db.add(attempt)
- self.db.commit()
- def login(self, username: str, password: str, ip: str) -> AdminLoginResponse:
- """
- 管理员登录
- Requirements: 1.1, 1.2, 1.3
- """
- from app.services.log_service import LogService
- log_service = LogService(self.db)
-
- # 检查是否被锁定
- if not self.check_login_attempts(username):
- self.record_login_attempt(username, False, ip)
- log_service.log_login(
- user_id=username,
- user_type="admin",
- login_result="failed",
- fail_reason="账号已锁定",
- ip_address=ip
- )
- raise ValueError("ACCOUNT_LOCKED")
- # 查找管理员
- admin = self.db.query(AdminUser).filter(AdminUser.username == username).first()
- if not admin:
- self.record_login_attempt(username, False, ip)
- log_service.log_login(
- user_id=username,
- user_type="admin",
- login_result="failed",
- fail_reason="用户名或密码错误",
- ip_address=ip
- )
- raise ValueError("AUTH_FAILED")
- # 检查管理员状态
- if admin.status != "active":
- self.record_login_attempt(username, False, ip)
- log_service.log_login(
- user_id=admin.username,
- user_type="admin",
- login_result="failed",
- fail_reason="账号已禁用",
- ip_address=ip
- )
- raise ValueError("ACCOUNT_DISABLED")
- # 验证密码
- if not self.verify_password(password, admin.password_hash):
- self.record_login_attempt(username, False, ip)
- log_service.log_login(
- user_id=admin.username,
- user_type="admin",
- login_result="failed",
- fail_reason="用户名或密码错误",
- ip_address=ip
- )
- raise ValueError("AUTH_FAILED")
- # 登录成功,记录并生成 Token
- self.record_login_attempt(username, True, ip)
- log_service.log_login(
- user_id=admin.username,
- user_type="admin",
- login_result="success",
- ip_address=ip
- )
- token, expires_at = self.create_access_token(admin.id)
- return AdminLoginResponse(
- token=token,
- admin_id=admin.id,
- username=admin.username,
- nickname=admin.nickname,
- expires_at=expires_at
- )
- def get_admin_by_id(self, admin_id: int) -> Optional[AdminUser]:
- """根据 ID 获取管理员"""
- return self.db.query(AdminUser).filter(AdminUser.id == admin_id).first()
|