""" 认证服务层 提供密码哈希、JWT令牌生成和验证功能 需求: 2.1, 2.2, 2.3, 4.1, 4.4, 4.5, 5.1, 5.2, 5.3 """ import os import time import uuid from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from app.models.user import User # 密码哈希上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT配置 SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production") # 确保密钥是字符串类型 SECRET_KEY = str(SECRET_KEY) ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_HOURS = int(os.getenv("JWT_EXPIRE_HOURS", "24")) class AuthService: """认证服务类""" def __init__(self, db: Session): self.db = db @staticmethod def hash_password(password: str) -> str: """使用bcrypt哈希密码""" return pwd_context.hash(password) @staticmethod def verify_password(plain_password: str, hashed_password: str) -> bool: """验证密码""" return pwd_context.verify(plain_password, hashed_password) @staticmethod def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str: """生成JWT访问令牌""" expire = datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)) to_encode = { "user_id": user_id, "exp": expire, "iat": int(time.time()), "jti": str(uuid.uuid4()), } return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) @staticmethod def verify_token(token: str) -> dict: """验证JWT令牌并返回payload""" return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) def authenticate_user(self, username: str, password: str) -> Optional[User]: """ 验证用户凭据。 优化:bcrypt 验证结果缓存 5 分钟(Redis key = bcrypt_ok:{user_id}:{password_hash_prefix})。 同一用户短时间内重复登录(如前端自动刷新 token)跳过 bcrypt,减少 CPU 压力。 Redis 不可用时自动降级为每次全量 bcrypt 验证。 """ user = self.db.query(User).filter(User.username == username).first() if not user or not user.password_hash: return None # 尝试从 Redis 缓存读取验证结果 # key 包含密码哈希前缀,确保密码修改后缓存自动失效 cache_key = f"bcrypt_ok:{user.id}:{user.password_hash[:16]}" try: from app.core.redis import redis_manager r = redis_manager.get_sync_client() if r: cached_pwd = r.get(cache_key) if cached_pwd == password: return user # 缓存命中,跳过 bcrypt except Exception: pass # 缓存未命中,执行 bcrypt 验证 if not self.verify_password(password, user.password_hash): return None # 验证成功,写入缓存(TTL 5 分钟) try: from app.core.redis import redis_manager r = redis_manager.get_sync_client() if r: r.setex(cache_key, 300, password) except Exception: pass return user