jwt_service.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. """
  2. JWT 签发与验证服务
  3. 用于本地用户认证,替代 SSO token 透传方案
  4. """
  5. import jwt
  6. import logging
  7. from datetime import datetime, timedelta, timezone
  8. from fastapi import HTTPException, status
  9. from config import settings
  10. logger = logging.getLogger(__name__)
  11. ACCESS_TOKEN_EXPIRE_MINUTES = 30
  12. REFRESH_TOKEN_EXPIRE_DAYS = 7
  13. def create_access_token(user_id: str, username: str, email: str, role: str) -> str:
  14. """创建 Access Token(30 分钟有效期)"""
  15. expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
  16. payload = {
  17. "sub": user_id,
  18. "username": username,
  19. "email": email,
  20. "role": role,
  21. "exp": expire,
  22. "type": "access",
  23. }
  24. return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
  25. def create_refresh_token(user_id: str) -> str:
  26. """创建 Refresh Token(7 天有效期)"""
  27. expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
  28. payload = {
  29. "sub": user_id,
  30. "exp": expire,
  31. "type": "refresh",
  32. }
  33. return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
  34. def verify_token(token: str) -> dict:
  35. """
  36. 验证 JWT Token,返回 payload。
  37. 如果过期或无效,抛出 HTTPException。
  38. """
  39. try:
  40. payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
  41. return payload
  42. except jwt.ExpiredSignatureError:
  43. raise HTTPException(
  44. status_code=status.HTTP_401_UNAUTHORIZED,
  45. detail="令牌已过期,请重新登录",
  46. )
  47. except jwt.InvalidTokenError:
  48. raise HTTPException(
  49. status_code=status.HTTP_401_UNAUTHORIZED,
  50. detail="无效的令牌",
  51. )