| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- """
- 认证API路由
- 提供用户注册、登录、Token验证和刷新的API端点
- 需求: 3.1, 4.1, 4.2, 4.3
- """
- from decimal import Decimal
- from fastapi import APIRouter, Depends, HTTPException, status, Request, Form
- from pydantic import BaseModel
- from sqlalchemy.orm import Session
- from jose import JWTError
- from typing import Annotated, Optional
- from app.services.data_encryption_service import encryption_service
- from app.database import get_db
- from app.schemas.user_schema import UserCreate, UserResponse, TokenResponse, TokenVerifyResponse, UserLogin
- from app.services.auth_service import AuthService
- from app.services.user_service import UserService
- from app.services.system_config_manager import get_config_bool, get_config_float, get_config_string
- from app.dependencies.auth import get_current_user, oauth2_scheme
- from app.models.user import User
- from app.services.token_revocation_service import token_revocation_service
- router = APIRouter(prefix="/api/auth", tags=["认证"])
- def get_real_password(raw_password: str, encrypted: bool = True) -> str:
- """
- 获取真实密码
-
- Args:
- raw_password: 原始密码(可能是加密的或明文的)
- encrypted: 是否已加密
-
- Returns:
- 解密后的明文密码
- """
- if not encrypted:
- # 明文密码,直接返回
- return raw_password
-
- # 加密密码,解密后返回
- try:
- return encryption_service.decrypt(raw_password)
- except Exception:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="账号或密码错误",
- headers={"WWW-Authenticate": "Bearer"}
- )
- @router.post("/register", response_model=UserResponse, summary="用户注册", description="创建新用户账号。前端调用时密码已加密,Swagger测试时使用明文密码需设置encrypted=false。")
- async def register(
- data: UserCreate,
- db: Session = Depends(get_db)
- ):
- """
- 用户注册
-
- **注意**: 非前端调用(如Swagger测试)时,请设置 `"encrypted": false` 并使用明文密码
- """
- # 有手机号时校验短信验证码(学校邮箱注册不需要)
- if data.phone:
- if not data.sms_code:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="请输入手机验证码"
- )
- from app.services.sms_service import sms_code_service
- ok = await sms_code_service.verify_code(data.phone, data.sms_code)
- if not ok:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="验证码错误或已过期"
- )
- # 有邮箱时校验邮箱验证码(如果提供了邮箱验证码)
- if data.email and data.email_code:
- from app.services.email_service import email_code_service
- ok = await email_code_service.verify_code(data.email.strip().lower(), data.email_code)
- if not ok:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="邮箱验证码错误或已过期"
- )
- data.password = get_real_password(data.password, data.encrypted)
-
- # 检查密码强度
- from app.services.password_strength_service import PasswordStrengthService
- from app.schemas.password_strength_schema import PasswordStrengthLevel
-
- strength_result = PasswordStrengthService.check_password_strength(data.password)
- if strength_result.strength == PasswordStrengthLevel.WEAK:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="密码强度太弱,请至少包含字母和数字两种字符"
- )
-
- # 检查注册开关
- if not get_config_bool("enable_registration", True):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="系统暂未开放注册"
- )
-
- user_service = UserService(db)
- user = user_service.create_user(data)
- return UserResponse.model_validate(user)
- @router.post("/login", summary="用户登录", description="使用用户名/手机号和密码登录。前端调用时密码已加密,Swagger测试时使用明文密码需设置encrypted=false。")
- async def login(
- credentials: UserLogin,
- request: Request,
- db: Session = Depends(get_db)
- ):
- """
- 用户登录
-
- **注意**: 非前端调用(如Swagger测试)时,请设置 `"encrypted": false` 并使用明文密码
- """
- from app.services.log_service import LogService
- from app.services.auth_service import AuthService
- username = credentials.username
- real_password = get_real_password(credentials.password, credentials.encrypted)
- ip_address = request.client.host if request and request.client else None
- user_agent = request.headers.get("user-agent") if request else None
- log_service = LogService(db)
- auth_service = AuthService(db)
- user = auth_service.authenticate_user(username, real_password)
-
- if not user:
- log_service.log_login(
- user_id=username,
- user_type="user",
- login_result="failed",
- fail_reason="用户名或密码错误",
- ip_address=ip_address,
- user_agent=user_agent
- )
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid username or password",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- log_service.log_login(
- user_id=user.id,
- user_type="user",
- login_result="success",
- ip_address=ip_address,
- user_agent=user_agent
- )
- access_token = AuthService.create_access_token(user.id)
- return TokenResponse(
- access_token=access_token,
- user=UserResponse.model_validate(user)
- )
- @router.get("/verify", response_model=TokenVerifyResponse)
- def verify_token(
- current_user: User = Depends(get_current_user),
- db: Session = Depends(get_db)
- ):
- """
- 验证Token是否有效
- 如果Token有效,返回用户信息;如果无效,返回401错误
- """
- return TokenVerifyResponse(
- valid=True,
- user=UserResponse.model_validate(current_user)
- )
- @router.post("/refresh", response_model=TokenResponse)
- def refresh_token(
- current_user: User = Depends(get_current_user),
- db: Session = Depends(get_db)
- ):
- """
- 刷新Token
- 使用当前有效的Token获取新的Token(延长有效期)
- """
- # 生成新的Token
- new_access_token = AuthService.create_access_token(current_user.id)
- return TokenResponse(
- access_token=new_access_token,
- user=UserResponse.model_validate(current_user)
- )
- @router.post("/logout")
- def logout(
- current_user: User = Depends(get_current_user),
- token: str = Depends(oauth2_scheme),
- ):
- """
- 用户登出
- 注意:由于JWT是无状态的,服务端不存储Token,
- 实际的Token失效需要客户端删除本地存储的Token。
- 此接口主要用于记录登出日志和未来可能的Token黑名单功能。
- """
- # 撤销当前 token,并使该用户已签发 token 全部失效。
- try:
- payload = AuthService.verify_token(token)
- token_revocation_service.revoke_payload(payload)
- token_revocation_service.revoke_user_sessions(current_user.id)
- except JWTError:
- pass
- return {"message": "登出成功", "user_id": current_user.id}
- @router.get("/features", summary="获取公开功能开关", tags=["认证"])
- def get_feature_flags():
- """
- 返回前端需要的功能开关(无需登录)
- """
- return {
- "enable_openclaw": get_config_bool("enable_openclaw", True),
- "enable_openclaw_client": get_config_bool("enable_openclaw_client", True),
- "enable_openclaw_web": get_config_bool("enable_openclaw_web", True),
- "openclaw_client_url": get_config_string("openclaw_client_url", ""),
- }
- # ==================== 手机号验证码登录 ====================
- class PhoneLoginRequest(BaseModel):
- phone: str
- sms_code: str
- @router.post("/login/phone", summary="手机号验证码登录")
- async def login_by_phone(
- body: PhoneLoginRequest,
- request: Request,
- db: Session = Depends(get_db)
- ):
- """手机号 + 验证码登录"""
- from app.services.sms_service import sms_code_service
- from app.services.log_service import LogService
- ip = request.client.host if request.client else None
- ok = await sms_code_service.verify_code(body.phone, body.sms_code)
- if not ok:
- raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="验证码错误或已过期")
- user = db.query(User).filter(User.phone == body.phone).first()
- if not user:
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="该手机号未注册")
- if user.status != "active":
- raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="账户已被禁用")
- LogService(db).log_login(user_id=user.id, user_type="user", login_result="success", ip_address=ip)
- access_token = AuthService.create_access_token(user.id)
- return TokenResponse(access_token=access_token, user=UserResponse.model_validate(user))
- # ==================== 手机验证码重置密码 ====================
- class ResetPasswordByPhoneRequest(BaseModel):
- phone: str
- sms_code: str
- new_password: str
- encrypted: bool = True
- @router.post("/reset-password/phone", summary="手机验证码修改密码")
- async def reset_password_by_phone(
- body: ResetPasswordByPhoneRequest,
- db: Session = Depends(get_db)
- ):
- """验证手机验证码后修改密码"""
- from app.services.sms_service import sms_code_service
- ok = await sms_code_service.verify_code(body.phone, body.sms_code)
- if not ok:
- raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="验证码错误或已过期")
- user = db.query(User).filter(User.phone == body.phone).first()
- if not user:
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="该手机号未注册")
- real_password = get_real_password(body.new_password, body.encrypted)
- from app.services.password_strength_service import PasswordStrengthService
- from app.schemas.password_strength_schema import PasswordStrengthLevel
- if PasswordStrengthService.check_password_strength(real_password).strength == PasswordStrengthLevel.WEAK:
- raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="密码强度太弱,请至少包含字母和数字两种字符")
- user.password_hash = AuthService.hash_password(real_password)
- db.commit()
- # 清除 bcrypt 登录缓存,防止旧密码在缓存过期前仍可登录
- try:
- from app.core.redis import redis_manager
- r = redis_manager.get_sync_client()
- if r:
- for key in r.keys(f"bcrypt_ok:{user.id}:*"):
- r.delete(key)
- except Exception:
- pass
- return {"code": 200, "message": "密码修改成功"}
- # ==================== 邮箱验证码重置密码 ====================
- class ResetPasswordByEmailRequest(BaseModel):
- email: str
- email_code: str
- new_password: str
- encrypted: bool = True
- @router.post("/reset-password/email", summary="邮箱验证码修改密码")
- async def reset_password_by_email(
- body: ResetPasswordByEmailRequest,
- db: Session = Depends(get_db)
- ):
- """验证邮箱验证码后修改密码"""
- from app.services.email_service import email_code_service
- email = body.email.strip().lower()
- ok = await email_code_service.verify_code(email, body.email_code)
- if not ok:
- raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="验证码错误或已过期")
- user = db.query(User).filter(User.email == email).first()
- if not user:
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="该邮箱未绑定任何账户")
- real_password = get_real_password(body.new_password, body.encrypted)
- from app.services.password_strength_service import PasswordStrengthService
- from app.schemas.password_strength_schema import PasswordStrengthLevel
- if PasswordStrengthService.check_password_strength(real_password).strength == PasswordStrengthLevel.WEAK:
- raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="密码强度太弱,请至少包含字母和数字两种字符")
- user.password_hash = AuthService.hash_password(real_password)
- db.commit()
- # 清除 bcrypt 登录缓存
- try:
- from app.core.redis import redis_manager
- r = redis_manager.get_sync_client()
- if r:
- for key in r.keys(f"bcrypt_ok:{user.id}:*"):
- r.delete(key)
- except Exception:
- pass
- return {"code": 200, "message": "密码修改成功"}
|