| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- """
- JWT Service for token generation and validation.
- Handles creation and verification of access and refresh tokens.
- """
- from datetime import datetime, timedelta
- from typing import Dict, Optional
- import jwt
- from config import settings
- class JWTService:
- """Service for JWT token operations."""
-
- @staticmethod
- def create_access_token(user_data: Dict) -> str:
- """
- Create access token with 15 minutes expiration.
-
- Args:
- user_data: Dict containing user_id, username, email, role
-
- Returns:
- Encoded JWT token string
- """
- expire = datetime.utcnow() + timedelta(
- minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
- )
- payload = {
- "sub": user_data["id"],
- "username": user_data["username"],
- "email": user_data["email"],
- "role": user_data["role"],
- "exp": expire,
- "iat": datetime.utcnow(),
- "type": "access"
- }
- return jwt.encode(
- payload,
- settings.JWT_SECRET_KEY,
- algorithm=settings.JWT_ALGORITHM
- )
-
- @staticmethod
- def create_refresh_token(user_data: Dict) -> str:
- """
- Create refresh token with 7 days expiration.
-
- Args:
- user_data: Dict containing user_id
-
- Returns:
- Encoded JWT token string
- """
- expire = datetime.utcnow() + timedelta(
- days=settings.REFRESH_TOKEN_EXPIRE_DAYS
- )
- payload = {
- "sub": user_data["id"],
- "exp": expire,
- "iat": datetime.utcnow(),
- "type": "refresh"
- }
- return jwt.encode(
- payload,
- settings.JWT_SECRET_KEY,
- algorithm=settings.JWT_ALGORITHM
- )
-
- @staticmethod
- def verify_token(token: str, token_type: str = "access") -> Optional[Dict]:
- """
- Verify and decode JWT token.
-
- Args:
- token: JWT token string
- token_type: Expected token type (access or refresh)
-
- Returns:
- Decoded payload dict or None if invalid
-
- Raises:
- jwt.ExpiredSignatureError: Token has expired
- jwt.InvalidTokenError: Token is invalid
- """
- try:
- payload = jwt.decode(
- token,
- settings.JWT_SECRET_KEY,
- algorithms=[settings.JWT_ALGORITHM]
- )
-
- # Verify token type
- if payload.get("type") != token_type:
- return None
-
- return payload
- except jwt.ExpiredSignatureError:
- raise
- except jwt.InvalidTokenError:
- raise
|