auth_service.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. """
  2. 认证服务层
  3. 提供密码哈希、JWT令牌生成和验证功能
  4. 需求: 2.1, 2.2, 2.3, 4.1, 4.4, 4.5, 5.1, 5.2, 5.3
  5. """
  6. import os
  7. import time
  8. import uuid
  9. from datetime import datetime, timedelta
  10. from typing import Optional
  11. from jose import JWTError, jwt
  12. from passlib.context import CryptContext
  13. from sqlalchemy.orm import Session
  14. from app.models.user import User
  15. # 密码哈希上下文
  16. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  17. # JWT配置
  18. SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production")
  19. # 确保密钥是字符串类型
  20. SECRET_KEY = str(SECRET_KEY)
  21. ALGORITHM = "HS256"
  22. ACCESS_TOKEN_EXPIRE_HOURS = int(os.getenv("JWT_EXPIRE_HOURS", "24"))
  23. class AuthService:
  24. """认证服务类"""
  25. def __init__(self, db: Session):
  26. self.db = db
  27. @staticmethod
  28. def hash_password(password: str) -> str:
  29. """使用bcrypt哈希密码"""
  30. return pwd_context.hash(password)
  31. @staticmethod
  32. def verify_password(plain_password: str, hashed_password: str) -> bool:
  33. """验证密码"""
  34. return pwd_context.verify(plain_password, hashed_password)
  35. @staticmethod
  36. def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
  37. """生成JWT访问令牌"""
  38. expire = datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS))
  39. to_encode = {
  40. "user_id": user_id,
  41. "exp": expire,
  42. "iat": int(time.time()),
  43. "jti": str(uuid.uuid4()),
  44. }
  45. return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  46. @staticmethod
  47. def verify_token(token: str) -> dict:
  48. """验证JWT令牌并返回payload"""
  49. return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  50. def authenticate_user(self, username: str, password: str) -> Optional[User]:
  51. """
  52. 验证用户凭据。
  53. 优化:bcrypt 验证结果缓存 5 分钟(Redis key = bcrypt_ok:{user_id}:{password_hash_prefix})。
  54. 同一用户短时间内重复登录(如前端自动刷新 token)跳过 bcrypt,减少 CPU 压力。
  55. Redis 不可用时自动降级为每次全量 bcrypt 验证。
  56. """
  57. user = self.db.query(User).filter(User.username == username).first()
  58. if not user or not user.password_hash:
  59. return None
  60. # 尝试从 Redis 缓存读取验证结果
  61. # key 包含密码哈希前缀,确保密码修改后缓存自动失效
  62. cache_key = f"bcrypt_ok:{user.id}:{user.password_hash[:16]}"
  63. try:
  64. from app.core.redis import redis_manager
  65. r = redis_manager.get_sync_client()
  66. if r:
  67. cached_pwd = r.get(cache_key)
  68. if cached_pwd == password:
  69. return user # 缓存命中,跳过 bcrypt
  70. except Exception:
  71. pass
  72. # 缓存未命中,执行 bcrypt 验证
  73. if not self.verify_password(password, user.password_hash):
  74. return None
  75. # 验证成功,写入缓存(TTL 5 分钟)
  76. try:
  77. from app.core.redis import redis_manager
  78. r = redis_manager.get_sync_client()
  79. if r:
  80. r.setex(cache_key, 300, password)
  81. except Exception:
  82. pass
  83. return user