| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 |
- """
- 用户登录认证服务
- """
- from typing import Dict, Any
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from app.auth.models.user import User
- from app.utils.security import verify_password, create_access_token, create_refresh_token
- from app.core.exceptions import AuthenticationError
- class AuthService:
- async def login(self, db: AsyncSession, username: str, password: str) -> Dict[str, Any]:
- """用户登录"""
- result = await db.execute(select(User).where(User.username == username))
- user = result.scalar_one_or_none()
- if not user or not verify_password(password, user.password_hash):
- raise AuthenticationError(message="用户名或密码错误")
- if not user.is_active:
- raise AuthenticationError(message="用户已禁用")
- token_data = {"sub": str(user.id), "username": user.username}
- access_token = create_access_token(token_data)
- refresh_token = create_refresh_token(token_data)
- return {
- "access_token": access_token,
- "refresh_token": refresh_token,
- "user": {"id": str(user.id), "username": user.username},
- }
- async def refresh(self, refresh_token: str) -> Dict[str, Any]:
- """刷新访问令牌"""
- from app.services.jwt_token import verify_token
- payload = verify_token(refresh_token)
- if not payload or payload.get("type") != "refresh":
- raise AuthenticationError(message="无效的刷新令牌")
- token_data = {"sub": payload["sub"], "username": payload["username"]}
- return {"access_token": create_access_token(token_data)}
- auth_service = AuthService()
|