""" 认证API路由 提供用户注册、登录、Token验证和刷新的API端点 需求: 3.1, 4.1, 4.2, 4.3 """ from decimal import Decimal from fastapi import APIRouter, Depends, HTTPException, status, Request, Form from pydantic import BaseModel from sqlalchemy.orm import Session from jose import JWTError from typing import Annotated, Optional from app.services.data_encryption_service import encryption_service from app.database import get_db from app.schemas.user_schema import UserCreate, UserResponse, TokenResponse, TokenVerifyResponse, UserLogin from app.services.auth_service import AuthService from app.services.user_service import UserService from app.services.system_config_manager import get_config_bool, get_config_float, get_config_string from app.dependencies.auth import get_current_user, oauth2_scheme from app.models.user import User from app.services.token_revocation_service import token_revocation_service router = APIRouter(prefix="/api/auth", tags=["认证"]) def get_real_password(raw_password: str, encrypted: bool = True) -> str: """ 获取真实密码 Args: raw_password: 原始密码(可能是加密的或明文的) encrypted: 是否已加密 Returns: 解密后的明文密码 """ if not encrypted: # 明文密码,直接返回 return raw_password # 加密密码,解密后返回 try: return encryption_service.decrypt(raw_password) except Exception: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="账号或密码错误", headers={"WWW-Authenticate": "Bearer"} ) @router.post("/register", response_model=UserResponse, summary="用户注册", description="创建新用户账号。前端调用时密码已加密,Swagger测试时使用明文密码需设置encrypted=false。") async def register( data: UserCreate, db: Session = Depends(get_db) ): """ 用户注册 **注意**: 非前端调用(如Swagger测试)时,请设置 `"encrypted": false` 并使用明文密码 """ # 有手机号时校验短信验证码(学校邮箱注册不需要) if data.phone: if not data.sms_code: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="请输入手机验证码" ) from app.services.sms_service import sms_code_service ok = await sms_code_service.verify_code(data.phone, data.sms_code) if not ok: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="验证码错误或已过期" ) # 有邮箱时校验邮箱验证码(如果提供了邮箱验证码) if data.email and data.email_code: from app.services.email_service import email_code_service ok = await email_code_service.verify_code(data.email.strip().lower(), data.email_code) if not ok: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="邮箱验证码错误或已过期" ) data.password = get_real_password(data.password, data.encrypted) # 检查密码强度 from app.services.password_strength_service import PasswordStrengthService from app.schemas.password_strength_schema import PasswordStrengthLevel strength_result = PasswordStrengthService.check_password_strength(data.password) if strength_result.strength == PasswordStrengthLevel.WEAK: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="密码强度太弱,请至少包含字母和数字两种字符" ) # 检查注册开关 if not get_config_bool("enable_registration", True): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="系统暂未开放注册" ) user_service = UserService(db) user = user_service.create_user(data) return UserResponse.model_validate(user) @router.post("/login", summary="用户登录", description="使用用户名/手机号和密码登录。前端调用时密码已加密,Swagger测试时使用明文密码需设置encrypted=false。") async def login( credentials: UserLogin, request: Request, db: Session = Depends(get_db) ): """ 用户登录 **注意**: 非前端调用(如Swagger测试)时,请设置 `"encrypted": false` 并使用明文密码 """ from app.services.log_service import LogService from app.services.auth_service import AuthService username = credentials.username real_password = get_real_password(credentials.password, credentials.encrypted) ip_address = request.client.host if request and request.client else None user_agent = request.headers.get("user-agent") if request else None log_service = LogService(db) auth_service = AuthService(db) user = auth_service.authenticate_user(username, real_password) if not user: log_service.log_login( user_id=username, user_type="user", login_result="failed", fail_reason="用户名或密码错误", ip_address=ip_address, user_agent=user_agent ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password", headers={"WWW-Authenticate": "Bearer"}, ) log_service.log_login( user_id=user.id, user_type="user", login_result="success", ip_address=ip_address, user_agent=user_agent ) access_token = AuthService.create_access_token(user.id) return TokenResponse( access_token=access_token, user=UserResponse.model_validate(user) ) @router.get("/verify", response_model=TokenVerifyResponse) def verify_token( current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ 验证Token是否有效 如果Token有效,返回用户信息;如果无效,返回401错误 """ return TokenVerifyResponse( valid=True, user=UserResponse.model_validate(current_user) ) @router.post("/refresh", response_model=TokenResponse) def refresh_token( current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ 刷新Token 使用当前有效的Token获取新的Token(延长有效期) """ # 生成新的Token new_access_token = AuthService.create_access_token(current_user.id) return TokenResponse( access_token=new_access_token, user=UserResponse.model_validate(current_user) ) @router.post("/logout") def logout( current_user: User = Depends(get_current_user), token: str = Depends(oauth2_scheme), ): """ 用户登出 注意:由于JWT是无状态的,服务端不存储Token, 实际的Token失效需要客户端删除本地存储的Token。 此接口主要用于记录登出日志和未来可能的Token黑名单功能。 """ # 撤销当前 token,并使该用户已签发 token 全部失效。 try: payload = AuthService.verify_token(token) token_revocation_service.revoke_payload(payload) token_revocation_service.revoke_user_sessions(current_user.id) except JWTError: pass return {"message": "登出成功", "user_id": current_user.id} @router.get("/features", summary="获取公开功能开关", tags=["认证"]) def get_feature_flags(): """ 返回前端需要的功能开关(无需登录) """ return { "enable_openclaw": get_config_bool("enable_openclaw", True), "enable_openclaw_client": get_config_bool("enable_openclaw_client", True), "enable_openclaw_web": get_config_bool("enable_openclaw_web", True), "openclaw_client_url": get_config_string("openclaw_client_url", ""), } # ==================== 手机号验证码登录 ==================== class PhoneLoginRequest(BaseModel): phone: str sms_code: str @router.post("/login/phone", summary="手机号验证码登录") async def login_by_phone( body: PhoneLoginRequest, request: Request, db: Session = Depends(get_db) ): """手机号 + 验证码登录""" from app.services.sms_service import sms_code_service from app.services.log_service import LogService ip = request.client.host if request.client else None ok = await sms_code_service.verify_code(body.phone, body.sms_code) if not ok: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="验证码错误或已过期") user = db.query(User).filter(User.phone == body.phone).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="该手机号未注册") if user.status != "active": raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="账户已被禁用") LogService(db).log_login(user_id=user.id, user_type="user", login_result="success", ip_address=ip) access_token = AuthService.create_access_token(user.id) return TokenResponse(access_token=access_token, user=UserResponse.model_validate(user)) # ==================== 手机验证码重置密码 ==================== class ResetPasswordByPhoneRequest(BaseModel): phone: str sms_code: str new_password: str encrypted: bool = True @router.post("/reset-password/phone", summary="手机验证码修改密码") async def reset_password_by_phone( body: ResetPasswordByPhoneRequest, db: Session = Depends(get_db) ): """验证手机验证码后修改密码""" from app.services.sms_service import sms_code_service ok = await sms_code_service.verify_code(body.phone, body.sms_code) if not ok: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="验证码错误或已过期") user = db.query(User).filter(User.phone == body.phone).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="该手机号未注册") real_password = get_real_password(body.new_password, body.encrypted) from app.services.password_strength_service import PasswordStrengthService from app.schemas.password_strength_schema import PasswordStrengthLevel if PasswordStrengthService.check_password_strength(real_password).strength == PasswordStrengthLevel.WEAK: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="密码强度太弱,请至少包含字母和数字两种字符") user.password_hash = AuthService.hash_password(real_password) db.commit() # 清除 bcrypt 登录缓存,防止旧密码在缓存过期前仍可登录 try: from app.core.redis import redis_manager r = redis_manager.get_sync_client() if r: for key in r.keys(f"bcrypt_ok:{user.id}:*"): r.delete(key) except Exception: pass return {"code": 200, "message": "密码修改成功"} # ==================== 邮箱验证码重置密码 ==================== class ResetPasswordByEmailRequest(BaseModel): email: str email_code: str new_password: str encrypted: bool = True @router.post("/reset-password/email", summary="邮箱验证码修改密码") async def reset_password_by_email( body: ResetPasswordByEmailRequest, db: Session = Depends(get_db) ): """验证邮箱验证码后修改密码""" from app.services.email_service import email_code_service email = body.email.strip().lower() ok = await email_code_service.verify_code(email, body.email_code) if not ok: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="验证码错误或已过期") user = db.query(User).filter(User.email == email).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="该邮箱未绑定任何账户") real_password = get_real_password(body.new_password, body.encrypted) from app.services.password_strength_service import PasswordStrengthService from app.schemas.password_strength_schema import PasswordStrengthLevel if PasswordStrengthService.check_password_strength(real_password).strength == PasswordStrengthLevel.WEAK: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="密码强度太弱,请至少包含字母和数字两种字符") user.password_hash = AuthService.hash_password(real_password) db.commit() # 清除 bcrypt 登录缓存 try: from app.core.redis import redis_manager r = redis_manager.get_sync_client() if r: for key in r.keys(f"bcrypt_ok:{user.id}:*"): r.delete(key) except Exception: pass return {"code": 200, "message": "密码修改成功"}