""" JWT 签发与验证服务 用于本地用户认证,替代 SSO token 透传方案 """ import jwt import logging from datetime import datetime, timedelta, timezone from fastapi import HTTPException, status from config import settings logger = logging.getLogger(__name__) ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_DAYS = 7 def create_access_token(user_id: str, username: str, email: str, role: str) -> str: """创建 Access Token(30 分钟有效期)""" expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) payload = { "sub": user_id, "username": username, "email": email, "role": role, "exp": expire, "type": "access", } return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) def create_refresh_token(user_id: str) -> str: """创建 Refresh Token(7 天有效期)""" expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) payload = { "sub": user_id, "exp": expire, "type": "refresh", } return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) def verify_token(token: str) -> dict: """ 验证 JWT Token,返回 payload。 如果过期或无效,抛出 HTTPException。 """ try: payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="令牌已过期,请重新登录", ) except jwt.InvalidTokenError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的令牌", )