auth.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import jwt
  2. from fastapi import Depends, HTTPException, status
  3. from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
  4. from sqlalchemy import select
  5. from app.core.db import UserModel, async_session
  6. from app.core.security import decode_access_token
  7. security = HTTPBearer(auto_error=False)
  8. async def get_current_user(
  9. credentials: HTTPAuthorizationCredentials = Depends(security),
  10. ) -> dict:
  11. if not credentials:
  12. raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
  13. try:
  14. payload = decode_access_token(credentials.credentials)
  15. if payload.get("type") != "access":
  16. raise HTTPException(status_code=401, detail="Invalid token type")
  17. except jwt.ExpiredSignatureError:
  18. raise HTTPException(status_code=401, detail="Token expired")
  19. except jwt.InvalidTokenError:
  20. raise HTTPException(status_code=401, detail="Invalid token")
  21. return payload
  22. async def get_current_active_user(
  23. current_user: dict = Depends(get_current_user),
  24. ) -> dict:
  25. user_id = current_user.get("sub")
  26. async with async_session() as session:
  27. result = await session.execute(select(UserModel).where(UserModel.id == user_id))
  28. user = result.scalar_one_or_none()
  29. if not user or not user.is_active:
  30. raise HTTPException(status_code=401, detail="User not found or inactive")
  31. return current_user