auth_service.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. """
  2. 用户登录认证服务
  3. """
  4. from typing import Dict, Any
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from sqlalchemy import select
  7. from app.auth.models.user import User
  8. from app.utils.security import verify_password, create_access_token, create_refresh_token
  9. from app.core.exceptions import AuthenticationError
  10. class AuthService:
  11. async def login(self, db: AsyncSession, username: str, password: str) -> Dict[str, Any]:
  12. """用户登录"""
  13. result = await db.execute(select(User).where(User.username == username))
  14. user = result.scalar_one_or_none()
  15. if not user or not verify_password(password, user.password_hash):
  16. raise AuthenticationError(message="用户名或密码错误")
  17. if not user.is_active:
  18. raise AuthenticationError(message="用户已禁用")
  19. token_data = {"sub": str(user.id), "username": user.username}
  20. access_token = create_access_token(token_data)
  21. refresh_token = create_refresh_token(token_data)
  22. return {
  23. "access_token": access_token,
  24. "refresh_token": refresh_token,
  25. "user": {"id": str(user.id), "username": user.username},
  26. }
  27. async def refresh(self, refresh_token: str) -> Dict[str, Any]:
  28. """刷新访问令牌"""
  29. from app.services.jwt_token import verify_token
  30. payload = verify_token(refresh_token)
  31. if not payload or payload.get("type") != "refresh":
  32. raise AuthenticationError(message="无效的刷新令牌")
  33. token_data = {"sub": payload["sub"], "username": payload["username"]}
  34. return {"access_token": create_access_token(token_data)}
  35. auth_service = AuthService()