| 12345678910111213141516171819202122232425262728293031323334353637 |
- import jwt
- from fastapi import Depends, HTTPException, status
- from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
- from sqlalchemy import select
- from app.core.db import UserModel, async_session
- from app.core.security import decode_access_token
- security = HTTPBearer(auto_error=False)
- async def get_current_user(
- credentials: HTTPAuthorizationCredentials = Depends(security),
- ) -> dict:
- if not credentials:
- raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
- try:
- payload = decode_access_token(credentials.credentials)
- if payload.get("type") != "access":
- raise HTTPException(status_code=401, detail="Invalid token type")
- except jwt.ExpiredSignatureError:
- raise HTTPException(status_code=401, detail="Token expired")
- except jwt.InvalidTokenError:
- raise HTTPException(status_code=401, detail="Invalid token")
- return payload
- async def get_current_active_user(
- current_user: dict = Depends(get_current_user),
- ) -> dict:
- user_id = current_user.get("sub")
- async with async_session() as session:
- result = await session.execute(select(UserModel).where(UserModel.id == user_id))
- user = result.scalar_one_or_none()
- if not user or not user.is_active:
- raise HTTPException(status_code=401, detail="User not found or inactive")
- return current_user
|