""" 用户登录认证服务 """ from typing import Dict, Any from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.auth.models.user import User from app.utils.security import verify_password, create_access_token, create_refresh_token from app.core.exceptions import AuthenticationError class AuthService: async def login(self, db: AsyncSession, username: str, password: str) -> Dict[str, Any]: """用户登录""" result = await db.execute(select(User).where(User.username == username)) user = result.scalar_one_or_none() if not user or not verify_password(password, user.password_hash): raise AuthenticationError(message="用户名或密码错误") if not user.is_active: raise AuthenticationError(message="用户已禁用") token_data = {"sub": str(user.id), "username": user.username} access_token = create_access_token(token_data) refresh_token = create_refresh_token(token_data) return { "access_token": access_token, "refresh_token": refresh_token, "user": {"id": str(user.id), "username": user.username}, } async def refresh(self, refresh_token: str) -> Dict[str, Any]: """刷新访问令牌""" from app.services.jwt_token import verify_token payload = verify_token(refresh_token) if not payload or payload.get("type") != "refresh": raise AuthenticationError(message="无效的刷新令牌") token_data = {"sub": payload["sub"], "username": payload["username"]} return {"access_token": create_access_token(token_data)} auth_service = AuthService()