| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- """
- JWT 签发与验证服务
- 用于本地用户认证,替代 SSO token 透传方案
- """
- import jwt
- import logging
- from datetime import datetime, timedelta, timezone
- from fastapi import HTTPException, status
- from config import settings
- logger = logging.getLogger(__name__)
- ACCESS_TOKEN_EXPIRE_MINUTES = 30
- REFRESH_TOKEN_EXPIRE_DAYS = 7
- def create_access_token(user_id: str, username: str, email: str, role: str) -> str:
- """创建 Access Token(30 分钟有效期)"""
- expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- payload = {
- "sub": user_id,
- "username": username,
- "email": email,
- "role": role,
- "exp": expire,
- "type": "access",
- }
- return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
- def create_refresh_token(user_id: str) -> str:
- """创建 Refresh Token(7 天有效期)"""
- expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
- payload = {
- "sub": user_id,
- "exp": expire,
- "type": "refresh",
- }
- return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
- def verify_token(token: str) -> dict:
- """
- 验证 JWT Token,返回 payload。
- 如果过期或无效,抛出 HTTPException。
- """
- try:
- payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
- return payload
- except jwt.ExpiredSignatureError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="令牌已过期,请重新登录",
- )
- except jwt.InvalidTokenError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="无效的令牌",
- )
|