| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- """
- 认证服务层
- 提供密码哈希、JWT令牌生成和验证功能
- 需求: 2.1, 2.2, 2.3, 4.1, 4.4, 4.5, 5.1, 5.2, 5.3
- """
- import os
- import time
- import uuid
- from datetime import datetime, timedelta
- from typing import Optional
- from jose import JWTError, jwt
- from passlib.context import CryptContext
- from sqlalchemy.orm import Session
- from app.models.user import User
- # 密码哈希上下文
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- # JWT配置
- SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production")
- # 确保密钥是字符串类型
- SECRET_KEY = str(SECRET_KEY)
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_HOURS = int(os.getenv("JWT_EXPIRE_HOURS", "24"))
- class AuthService:
- """认证服务类"""
- def __init__(self, db: Session):
- self.db = db
- @staticmethod
- def hash_password(password: str) -> str:
- """使用bcrypt哈希密码"""
- return pwd_context.hash(password)
- @staticmethod
- def verify_password(plain_password: str, hashed_password: str) -> bool:
- """验证密码"""
- return pwd_context.verify(plain_password, hashed_password)
- @staticmethod
- def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
- """生成JWT访问令牌"""
- expire = datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS))
- to_encode = {
- "user_id": user_id,
- "exp": expire,
- "iat": int(time.time()),
- "jti": str(uuid.uuid4()),
- }
- return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- @staticmethod
- def verify_token(token: str) -> dict:
- """验证JWT令牌并返回payload"""
- return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- def authenticate_user(self, username: str, password: str) -> Optional[User]:
- """
- 验证用户凭据。
- 优化:bcrypt 验证结果缓存 5 分钟(Redis key = bcrypt_ok:{user_id}:{password_hash_prefix})。
- 同一用户短时间内重复登录(如前端自动刷新 token)跳过 bcrypt,减少 CPU 压力。
- Redis 不可用时自动降级为每次全量 bcrypt 验证。
- """
- user = self.db.query(User).filter(User.username == username).first()
- if not user or not user.password_hash:
- return None
- # 尝试从 Redis 缓存读取验证结果
- # key 包含密码哈希前缀,确保密码修改后缓存自动失效
- cache_key = f"bcrypt_ok:{user.id}:{user.password_hash[:16]}"
- try:
- from app.core.redis import redis_manager
- r = redis_manager.get_sync_client()
- if r:
- cached_pwd = r.get(cache_key)
- if cached_pwd == password:
- return user # 缓存命中,跳过 bcrypt
- except Exception:
- pass
- # 缓存未命中,执行 bcrypt 验证
- if not self.verify_password(password, user.password_hash):
- return None
- # 验证成功,写入缓存(TTL 5 分钟)
- try:
- from app.core.redis import redis_manager
- r = redis_manager.get_sync_client()
- if r:
- r.setex(cache_key, 300, password)
- except Exception:
- pass
- return user
|