design.md 64 KB

Design Document

Overview

本文档描述了标注平台 JWT 权限认证体系的详细设计。该系统采用 FastAPI + JWT 的标准认证方案,使用中间件实现全局鉴权,并为未来的 OAuth 集成预留扩展接口。

设计目标

  1. 安全性:使用 bcrypt 加密密码,JWT 签名验证,防止未授权访问
  2. 可扩展性:服务层架构,便于未来集成 OAuth 认证中心
  3. 易用性:标准的 RESTful API,清晰的错误消息
  4. 性能:中间件高效验证,数据库索引优化
  5. 兼容性:与现有 API 无缝集成,最小化代码改动

技术栈

  • FastAPI: Web 框架
  • PyJWT: JWT token 生成和验证
  • bcrypt: 密码哈希加密
  • SQLite: 数据库(与现有系统一致)
  • Pydantic: 数据验证和序列化

Architecture

系统架构图

┌─────────────────────────────────────────────────────────────┐
│                        Frontend (React)                      │
│  - Login Form                                                │
│  - Token Storage (localStorage)                             │
│  - HTTP Interceptor (Auto attach token)                     │
└────────────────────────┬────────────────────────────────────┘
                         │ HTTP Request (Authorization: Bearer <token>)
                         ▼
┌─────────────────────────────────────────────────────────────┐
│                     FastAPI Application                      │
│  ┌───────────────────────────────────────────────────────┐  │
│  │              Auth Middleware                          │  │
│  │  - Extract JWT from header                            │  │
│  │  - Verify signature & expiration                      │  │
│  │  - Attach user info to request                        │  │
│  │  - Skip public endpoints                              │  │
│  └───────────────────┬───────────────────────────────────┘  │
│                      │                                       │
│  ┌───────────────────▼───────────────────────────────────┐  │
│  │              Router Layer                             │  │
│  │  - /api/auth/*  (Auth Router)                         │  │
│  │  - /api/projects/*  (Protected)                       │  │
│  │  - /api/tasks/*  (Protected)                          │  │
│  │  - /api/annotations/*  (Protected)                    │  │
│  └───────────────────┬───────────────────────────────────┘  │
│                      │                                       │
│  ┌───────────────────▼───────────────────────────────────┐  │
│  │              Service Layer                            │  │
│  │  - AuthService (login, register, token)               │  │
│  │  - UserService (user CRUD)                            │  │
│  │  - JWTService (token generation & validation)         │  │
│  └───────────────────┬───────────────────────────────────┘  │
│                      │                                       │
│  ┌───────────────────▼───────────────────────────────────┐  │
│  │              Data Layer                               │  │
│  │  - User Model                                         │  │
│  │  - Database Connection                                │  │
│  └───────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘
                         │
                         ▼
                  ┌──────────────┐
                  │   SQLite DB  │
                  │  - users     │
                  └──────────────┘

认证流程

1. 用户注册流程

User → POST /api/auth/register
     → AuthService.register()
     → Validate input (username, email, password)
     → Check username/email uniqueness
     → Hash password with bcrypt
     → Create user in database
     → Return success message

2. 用户登录流程

User → POST /api/auth/login
     → AuthService.login()
     → Validate credentials
     → Verify password with bcrypt
     → Generate Access Token (15 min)
     → Generate Refresh Token (7 days)
     → Return tokens + user info

3. Token 刷新流程

User → POST /api/auth/refresh
     → AuthService.refresh_token()
     → Verify Refresh Token
     → Extract user info
     → Generate new Access Token
     → Generate new Refresh Token (rotation)
     → Return new tokens

4. 受保护端点访问流程

User → GET /api/projects (with Authorization header)
     → Auth Middleware
     → Extract token from header
     → Verify JWT signature
     → Check expiration
     → Extract user info
     → Attach to request.state.user
     → Continue to route handler
     → Route handler uses request.state.user

Components and Interfaces

1. 数据库模型

User Model (models.py)

class User:
    """User model representing a system user."""
    
    def __init__(
        self,
        id: str,
        username: str,
        email: str,
        password_hash: str,
        role: str,
        oauth_provider: Optional[str],
        oauth_id: Optional[str],
        created_at: datetime
    ):
        self.id = id
        self.username = username
        self.email = email
        self.password_hash = password_hash
        self.role = role
        self.oauth_provider = oauth_provider  # 预留 OAuth
        self.oauth_id = oauth_id  # 预留 OAuth
        self.created_at = created_at
    
    @classmethod
    def from_row(cls, row):
        """Create User instance from database row."""
        return cls(
            id=row["id"],
            username=row["username"],
            email=row["email"],
            password_hash=row["password_hash"],
            role=row["role"],
            oauth_provider=row.get("oauth_provider"),
            oauth_id=row.get("oauth_id"),
            created_at=row["created_at"]
        )

2. Pydantic Schemas

Auth Schemas (schemas/auth.py)

from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime

class UserRegister(BaseModel):
    """User registration request schema."""
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    password: str = Field(..., min_length=8, max_length=100)

class UserLogin(BaseModel):
    """User login request schema."""
    username: str
    password: str

class TokenResponse(BaseModel):
    """Token response schema."""
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    user: "UserResponse"

class TokenRefresh(BaseModel):
    """Token refresh request schema."""
    refresh_token: str

class UserResponse(BaseModel):
    """User response schema."""
    id: str
    username: str
    email: str
    role: str
    created_at: datetime

class TokenPayload(BaseModel):
    """JWT token payload schema."""
    sub: str  # user_id
    username: str
    email: str
    role: str
    exp: datetime
    iat: datetime

3. 服务层

JWT Service (services/jwt_service.py)

from datetime import datetime, timedelta
from typing import Dict, Optional
import jwt
from config import settings

class JWTService:
    """Service for JWT token operations."""
    
    @staticmethod
    def create_access_token(user_data: Dict) -> str:
        """
        Create access token with 15 minutes expiration.
        
        Args:
            user_data: Dict containing user_id, username, email, role
            
        Returns:
            Encoded JWT token string
        """
        expire = datetime.utcnow() + timedelta(
            minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
        )
        payload = {
            "sub": user_data["id"],
            "username": user_data["username"],
            "email": user_data["email"],
            "role": user_data["role"],
            "exp": expire,
            "iat": datetime.utcnow(),
            "type": "access"
        }
        return jwt.encode(
            payload,
            settings.JWT_SECRET_KEY,
            algorithm=settings.JWT_ALGORITHM
        )
    
    @staticmethod
    def create_refresh_token(user_data: Dict) -> str:
        """
        Create refresh token with 7 days expiration.
        
        Args:
            user_data: Dict containing user_id
            
        Returns:
            Encoded JWT token string
        """
        expire = datetime.utcnow() + timedelta(
            days=settings.REFRESH_TOKEN_EXPIRE_DAYS
        )
        payload = {
            "sub": user_data["id"],
            "exp": expire,
            "iat": datetime.utcnow(),
            "type": "refresh"
        }
        return jwt.encode(
            payload,
            settings.JWT_SECRET_KEY,
            algorithm=settings.JWT_ALGORITHM
        )
    
    @staticmethod
    def verify_token(token: str, token_type: str = "access") -> Optional[Dict]:
        """
        Verify and decode JWT token.
        
        Args:
            token: JWT token string
            token_type: Expected token type (access or refresh)
            
        Returns:
            Decoded payload dict or None if invalid
            
        Raises:
            jwt.ExpiredSignatureError: Token has expired
            jwt.InvalidTokenError: Token is invalid
        """
        try:
            payload = jwt.decode(
                token,
                settings.JWT_SECRET_KEY,
                algorithms=[settings.JWT_ALGORITHM]
            )
            
            # Verify token type
            if payload.get("type") != token_type:
                return None
                
            return payload
        except jwt.ExpiredSignatureError:
            raise
        except jwt.InvalidTokenError:
            raise

Auth Service (services/auth_service.py)

import bcrypt
import uuid
from typing import Optional, Dict
from database import get_db_connection
from models import User
from services.jwt_service import JWTService
from fastapi import HTTPException, status

class AuthService:
    """Service for authentication operations."""
    
    @staticmethod
    def register_user(username: str, email: str, password: str) -> User:
        """
        Register a new user.
        
        Args:
            username: Unique username
            email: Unique email address
            password: Plain text password
            
        Returns:
            Created User object
            
        Raises:
            HTTPException: 409 if username or email already exists
        """
        with get_db_connection() as conn:
            cursor = conn.cursor()
            
            # Check username uniqueness
            cursor.execute(
                "SELECT id FROM users WHERE username = ?",
                (username,)
            )
            if cursor.fetchone():
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail="用户名已被使用"
                )
            
            # Check email uniqueness
            cursor.execute(
                "SELECT id FROM users WHERE email = ?",
                (email,)
            )
            if cursor.fetchone():
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail="邮箱已被使用"
                )
            
            # Hash password
            password_hash = bcrypt.hashpw(
                password.encode('utf-8'),
                bcrypt.gensalt()
            ).decode('utf-8')
            
            # Create user
            user_id = f"user_{uuid.uuid4().hex[:12]}"
            cursor.execute("""
                INSERT INTO users (
                    id, username, email, password_hash, role
                )
                VALUES (?, ?, ?, ?, ?)
            """, (user_id, username, email, password_hash, "annotator"))
            
            # Fetch created user
            cursor.execute(
                "SELECT * FROM users WHERE id = ?",
                (user_id,)
            )
            row = cursor.fetchone()
            return User.from_row(row)
    
    @staticmethod
    def login_user(username: str, password: str) -> Dict:
        """
        Authenticate user and generate tokens.
        
        Args:
            username: Username
            password: Plain text password
            
        Returns:
            Dict containing access_token, refresh_token, and user info
            
        Raises:
            HTTPException: 401 if credentials are invalid
        """
        with get_db_connection() as conn:
            cursor = conn.cursor()
            
            # Find user
            cursor.execute(
                "SELECT * FROM users WHERE username = ?",
                (username,)
            )
            row = cursor.fetchone()
            
            if not row:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="用户名或密码错误"
                )
            
            user = User.from_row(row)
            
            # Verify password
            if not bcrypt.checkpw(
                password.encode('utf-8'),
                user.password_hash.encode('utf-8')
            ):
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="用户名或密码错误"
                )
            
            # Generate tokens
            user_data = {
                "id": user.id,
                "username": user.username,
                "email": user.email,
                "role": user.role
            }
            
            access_token = JWTService.create_access_token(user_data)
            refresh_token = JWTService.create_refresh_token(user_data)
            
            return {
                "access_token": access_token,
                "refresh_token": refresh_token,
                "user": user_data
            }
    
    @staticmethod
    def refresh_tokens(refresh_token: str) -> Dict:
        """
        Refresh access token using refresh token.
        
        Args:
            refresh_token: Valid refresh token
            
        Returns:
            Dict containing new access_token and refresh_token
            
        Raises:
            HTTPException: 401 if refresh token is invalid or expired
        """
        try:
            payload = JWTService.verify_token(refresh_token, "refresh")
            if not payload:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="无效的刷新令牌"
                )
            
            user_id = payload["sub"]
            
            # Fetch user from database
            with get_db_connection() as conn:
                cursor = conn.cursor()
                cursor.execute(
                    "SELECT * FROM users WHERE id = ?",
                    (user_id,)
                )
                row = cursor.fetchone()
                
                if not row:
                    raise HTTPException(
                        status_code=status.HTTP_401_UNAUTHORIZED,
                        detail="用户不存在"
                    )
                
                user = User.from_row(row)
                user_data = {
                    "id": user.id,
                    "username": user.username,
                    "email": user.email,
                    "role": user.role
                }
                
                # Generate new tokens (token rotation)
                new_access_token = JWTService.create_access_token(user_data)
                new_refresh_token = JWTService.create_refresh_token(user_data)
                
                return {
                    "access_token": new_access_token,
                    "refresh_token": new_refresh_token,
                    "user": user_data
                }
                
        except Exception as e:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="刷新令牌已过期或无效,请重新登录"
            )
    
    @staticmethod
    def get_current_user(user_id: str) -> User:
        """
        Get user by ID.
        
        Args:
            user_id: User unique identifier
            
        Returns:
            User object
            
        Raises:
            HTTPException: 404 if user not found
        """
        with get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT * FROM users WHERE id = ?",
                (user_id,)
            )
            row = cursor.fetchone()
            
            if not row:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail="用户不存在"
                )
            
            return User.from_row(row)

4. 中间件

Auth Middleware (middleware/auth_middleware.py)

from fastapi import Request, HTTPException, status
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from services.jwt_service import JWTService
import jwt

class AuthMiddleware(BaseHTTPMiddleware):
    """
    Authentication middleware for JWT token verification.
    Validates JWT tokens and attaches user info to request state.
    """
    
    # Public endpoints that don't require authentication
    PUBLIC_PATHS = {
        "/",
        "/health",
        "/docs",
        "/openapi.json",
        "/redoc",
        "/api/auth/register",
        "/api/auth/login",
        "/api/auth/refresh",
    }
    
    async def dispatch(self, request: Request, call_next):
        """
        Process each request through authentication.
        
        Args:
            request: FastAPI Request object
            call_next: Next middleware or route handler
            
        Returns:
            Response from next handler or error response
        """
        # Skip authentication for public paths
        if request.url.path in self.PUBLIC_PATHS:
            return await call_next(request)
        
        # Skip authentication for OPTIONS requests (CORS preflight)
        if request.method == "OPTIONS":
            return await call_next(request)
        
        # Extract token from Authorization header
        auth_header = request.headers.get("Authorization")
        
        if not auth_header:
            return JSONResponse(
                status_code=status.HTTP_401_UNAUTHORIZED,
                content={
                    "detail": "缺少认证令牌",
                    "error_type": "missing_token"
                }
            )
        
        # Verify Bearer token format
        parts = auth_header.split()
        if len(parts) != 2 or parts[0].lower() != "bearer":
            return JSONResponse(
                status_code=status.HTTP_401_UNAUTHORIZED,
                content={
                    "detail": "无效的认证令牌格式",
                    "error_type": "invalid_token_format"
                }
            )
        
        token = parts[1]
        
        try:
            # Verify and decode token
            payload = JWTService.verify_token(token, "access")
            
            if not payload:
                return JSONResponse(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    content={
                        "detail": "无效的认证令牌",
                        "error_type": "invalid_token"
                    }
                )
            
            # Attach user info to request state
            request.state.user = {
                "id": payload["sub"],
                "username": payload["username"],
                "email": payload["email"],
                "role": payload["role"]
            }
            
            # Continue to route handler
            response = await call_next(request)
            return response
            
        except jwt.ExpiredSignatureError:
            return JSONResponse(
                status_code=status.HTTP_401_UNAUTHORIZED,
                content={
                    "detail": "认证令牌已过期",
                    "error_type": "token_expired"
                }
            )
        except jwt.InvalidTokenError:
            return JSONResponse(
                status_code=status.HTTP_401_UNAUTHORIZED,
                content={
                    "detail": "无效的认证令牌",
                    "error_type": "invalid_token"
                }
            )
        except Exception as e:
            return JSONResponse(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                content={
                    "detail": "认证过程发生错误",
                    "error_type": "auth_error"
                }
            )


def require_role(*allowed_roles: str):
    """
    Decorator to check user role.
    
    Usage:
        @require_role("admin", "annotator")
        async def my_endpoint(request: Request):
            ...
    
    Args:
        allowed_roles: Tuple of allowed role names
        
    Returns:
        Decorator function
    """
    def decorator(func):
        async def wrapper(request: Request, *args, **kwargs):
            user = getattr(request.state, "user", None)
            
            if not user:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="未认证"
                )
            
            if user["role"] not in allowed_roles:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail="权限不足"
                )
            
            return await func(request, *args, **kwargs)
        return wrapper
    return decorator

5. 路由

Auth Router (routers/auth.py)

from fastapi import APIRouter, HTTPException, status, Request
from schemas.auth import (
    UserRegister, UserLogin, TokenResponse,
    TokenRefresh, UserResponse
)
from services.auth_service import AuthService

router = APIRouter(
    prefix="/api/auth",
    tags=["authentication"]
)

@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(user_data: UserRegister):
    """
    Register a new user.
    
    Args:
        user_data: User registration data
        
    Returns:
        Success message
        
    Raises:
        HTTPException: 409 if username or email already exists
        HTTPException: 400 if validation fails
    """
    user = AuthService.register_user(
        username=user_data.username,
        email=user_data.email,
        password=user_data.password
    )
    
    return {
        "message": "用户注册成功",
        "user_id": user.id
    }

@router.post("/login", response_model=TokenResponse)
async def login(credentials: UserLogin):
    """
    Authenticate user and return JWT tokens.
    
    Args:
        credentials: User login credentials
        
    Returns:
        Access token, refresh token, and user info
        
    Raises:
        HTTPException: 401 if credentials are invalid
    """
    result = AuthService.login_user(
        username=credentials.username,
        password=credentials.password
    )
    
    return TokenResponse(
        access_token=result["access_token"],
        refresh_token=result["refresh_token"],
        user=UserResponse(**result["user"])
    )

@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(token_data: TokenRefresh):
    """
    Refresh access token using refresh token.
    
    Args:
        token_data: Refresh token
        
    Returns:
        New access token and refresh token
        
    Raises:
        HTTPException: 401 if refresh token is invalid or expired
    """
    result = AuthService.refresh_tokens(token_data.refresh_token)
    
    return TokenResponse(
        access_token=result["access_token"],
        refresh_token=result["refresh_token"],
        user=UserResponse(**result["user"])
    )

@router.get("/me", response_model=UserResponse)
async def get_current_user(request: Request):
    """
    Get current authenticated user info.
    
    Args:
        request: FastAPI Request with user info in state
        
    Returns:
        Current user information
        
    Raises:
        HTTPException: 401 if not authenticated
    """
    user_data = getattr(request.state, "user", None)
    
    if not user_data:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="未认证"
        )
    
    user = AuthService.get_current_user(user_data["id"])
    
    return UserResponse(
        id=user.id,
        username=user.username,
        email=user.email,
        role=user.role,
        created_at=user.created_at
    )

6. 配置

Config (config.py)

import os
import secrets
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    """Application settings."""
    
    # JWT Settings
    JWT_SECRET_KEY: str = os.getenv(
        "JWT_SECRET_KEY",
        secrets.token_urlsafe(32)
    )
    JWT_ALGORITHM: str = os.getenv("JWT_ALGORITHM", "HS256")
    ACCESS_TOKEN_EXPIRE_MINUTES: int = int(
        os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "15")
    )
    REFRESH_TOKEN_EXPIRE_DAYS: int = int(
        os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", "7")
    )
    
    # Database Settings
    DATABASE_PATH: str = os.getenv(
        "DATABASE_PATH",
        "annotation_platform.db"
    )
    
    # OAuth Settings (预留)
    OAUTH_ENABLED: bool = os.getenv("OAUTH_ENABLED", "false").lower() == "true"
    OAUTH_PROVIDER_URL: str = os.getenv("OAUTH_PROVIDER_URL", "")
    OAUTH_CLIENT_ID: str = os.getenv("OAUTH_CLIENT_ID", "")
    OAUTH_CLIENT_SECRET: str = os.getenv("OAUTH_CLIENT_SECRET", "")
    
    class Config:
        env_file = ".env"
        case_sensitive = True

settings = Settings()

# Warn if using default JWT secret
if settings.JWT_SECRET_KEY == secrets.token_urlsafe(32):
    import logging
    logging.warning(
        "使用默认生成的 JWT_SECRET_KEY,生产环境请设置环境变量"
    )

Data Models

Database Schema

Users Table

CREATE TABLE IF NOT EXISTS users (
    id TEXT PRIMARY KEY,
    username TEXT NOT NULL UNIQUE,
    email TEXT NOT NULL UNIQUE,
    password_hash TEXT NOT NULL,
    role TEXT NOT NULL DEFAULT 'annotator',
    oauth_provider TEXT,  -- 预留: 'google', 'github', etc.
    oauth_id TEXT,        -- 预留: OAuth provider user ID
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
CREATE INDEX IF NOT EXISTS idx_users_oauth ON users(oauth_provider, oauth_id);

字段说明

字段 类型 说明 约束
id TEXT 用户唯一标识 PRIMARY KEY, 格式: user_xxxxx
username TEXT 用户名 NOT NULL, UNIQUE, 3-50字符
email TEXT 邮箱地址 NOT NULL, UNIQUE, 有效邮箱格式
password_hash TEXT bcrypt 加密的密码 NOT NULL
role TEXT 用户角色 NOT NULL, 枚举: admin/annotator/viewer
oauth_provider TEXT OAuth 提供商 NULLABLE, 预留字段
oauth_id TEXT OAuth 用户 ID NULLABLE, 预留字段
created_at TIMESTAMP 创建时间 DEFAULT CURRENT_TIMESTAMP
updated_at TIMESTAMP 更新时间 DEFAULT CURRENT_TIMESTAMP

JWT Token Structure

Access Token Payload

{
  "sub": "user_abc123def456",
  "username": "john_doe",
  "email": "john@example.com",
  "role": "annotator",
  "type": "access",
  "iat": 1704067200,
  "exp": 1704068100
}

Refresh Token Payload

{
  "sub": "user_abc123def456",
  "type": "refresh",
  "iat": 1704067200,
  "exp": 1704672000
}

API Request/Response Examples

注册请求

POST /api/auth/register
Content-Type: application/json

{
  "username": "john_doe",
  "email": "john@example.com",
  "password": "SecurePass123!"
}

注册响应

HTTP/1.1 201 Created
Content-Type: application/json

{
  "message": "用户注册成功",
  "user_id": "user_abc123def456"
}

登录请求

POST /api/auth/login
Content-Type: application/json

{
  "username": "john_doe",
  "password": "SecurePass123!"
}

登录响应

HTTP/1.1 200 OK
Content-Type: application/json

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer",
  "user": {
    "id": "user_abc123def456",
    "username": "john_doe",
    "email": "john@example.com",
    "role": "annotator",
    "created_at": "2024-01-01T00:00:00Z"
  }
}

受保护端点请求

GET /api/projects
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

Correctness Properties

属性(Property)是系统在所有有效执行中应该保持为真的特征或行为——本质上是关于系统应该做什么的形式化陈述。属性作为人类可读规范和机器可验证正确性保证之间的桥梁。

Property 1: 用户注册创建有效用户

*对于任何*有效的用户名、邮箱和密码组合,注册后应该在数据库中创建一个新用户记录,且该记录包含正确的用户名、邮箱和加密后的密码。

Validates: Requirements 1.1, 1.5

Property 2: 密码哈希不可逆

*对于任何*注册的用户,数据库中存储的 password_hash 字段应该不等于原始密码,且应该是有效的 bcrypt 哈希格式(以 $2b$ 开头)。

Validates: Requirements 1.5, 7.1

Property 3: 短密码被拒绝

*对于任何*长度小于 8 个字符的密码字符串,注册请求应该返回 400 错误,且不应该在数据库中创建用户记录。

Validates: Requirements 1.4

Property 4: 新用户默认角色

*对于任何*新注册的用户,其角色字段应该自动设置为 "annotator"。

Validates: Requirements 1.6

Property 5: 登录返回有效 Tokens

*对于任何*已注册的用户,使用正确的用户名和密码登录应该返回包含 access_token、refresh_token 和 user 信息的响应,且两个 token 都应该是有效的 JWT 格式。

Validates: Requirements 2.1, 2.6

Property 6: Access Token 包含完整用户信息

*对于任何*登录成功返回的 access_token,解码后的 payload 应该包含 sub(用户ID)、username、email 和 role 字段。

Validates: Requirements 2.3

Property 7: Token 过期时间正确设置

*对于任何*生成的 access_token,其过期时间(exp)与签发时间(iat)的差值应该等于配置的 ACCESS_TOKEN_EXPIRE_MINUTES;对于任何 refresh_token,差值应该等于配置的 REFRESH_TOKEN_EXPIRE_DAYS。

Validates: Requirements 2.4, 2.5

Property 8: Token 刷新生成新令牌

*对于任何*有效的 refresh_token,使用它刷新应该返回新的 access_token 和 refresh_token,且新的 refresh_token 应该与旧的不同(token rotation)。

Validates: Requirements 3.1, 3.4

Property 9: 中间件验证有效 Token

*对于任何*受保护的端点和有效的 JWT access_token,请求应该成功通过中间件验证,且 request.state.user 应该包含正确的用户信息(id、username、email、role)。

Validates: Requirements 4.1, 4.5

Property 10: 用户信息端点返回正确数据

*对于任何*已认证的用户,请求 /api/auth/me 端点应该返回该用户的完整信息(id、username、email、role、created_at),且信息应该与数据库中的记录一致。

Validates: Requirements 6.1

Property 11: 现有 API 端点受保护

对于任何 /api/projects、/api/tasks 或 /api/annotations 端点,不带有效 token 的请求应该返回 401 错误,带有效 token 的请求应该成功处理。

Validates: Requirements 11.1, 11.2, 11.3

Property 12: 角色权限控制

*对于任何*需要特定角色的端点,只有具有允许角色的用户才能成功访问,其他角色的用户应该收到 403 错误。

Validates: Requirements 12.2, 12.3, 12.4

Error Handling

错误响应格式

所有错误响应遵循统一的 JSON 格式:

{
  "detail": "错误描述信息",
  "error_type": "error_category"
}

错误类型分类

1. 认证错误 (401 Unauthorized)

2. 授权错误 (403 Forbidden)

Error Type Detail 场景
missing_token 缺少认证令牌 Authorization header 不存在
invalid_token_format 无效的认证令牌格式 不是 "Bearer " 格式
invalid_token 无效的认证令牌 JWT 签名验证失败
token_expired 认证令牌已过期 JWT 过期时间已过
invalid_credentials 用户名或密码错误 登录凭据不正确
user_not_found 用户不存在 刷新 token 时用户已被删除
Error Type Detail 场景
insufficient_permissions 权限不足 用户角色不满足端点要求

3. 客户端错误 (400 Bad Request)

Error Type Detail 场景
validation_error 输入验证失败 Pydantic 验证失败
password_too_short 密码长度不足 密码少于 8 个字符
invalid_email 无效的邮箱格式 邮箱格式不正确

4. 冲突错误 (409 Conflict)

Error Type Detail 场景
username_exists 用户名已被使用 注册时用户名重复
email_exists 邮箱已被使用 注册时邮箱重复

5. 服务器错误 (500 Internal Server Error)

Error Type Detail 场景
auth_error 认证过程发生错误 中间件内部错误
database_error 数据库操作失败 数据库连接或查询错误
internal_error 服务器内部错误 未预期的异常

错误处理策略

1. 日志记录

import logging

logger = logging.getLogger(__name__)

# 记录认证失败(不记录密码)
logger.warning(
    f"Login failed for username: {username}",
    extra={"username": username, "ip": request.client.host}
)

# 记录服务器错误
logger.error(
    f"Authentication error: {str(e)}",
    exc_info=True,
    extra={"user_id": user_id}
)

2. 异常处理层次

FastAPI Exception Handlers
    ↓
Custom Exception Classes
    ↓
Service Layer Exceptions
    ↓
Database Exceptions

3. 自定义异常类

class AuthenticationError(Exception):
    """Base authentication exception."""
    pass

class InvalidCredentialsError(AuthenticationError):
    """Invalid username or password."""
    pass

class TokenExpiredError(AuthenticationError):
    """JWT token has expired."""
    pass

class InsufficientPermissionsError(AuthenticationError):
    """User lacks required permissions."""
    pass

安全考虑

  1. 密码错误不泄露信息:用户名不存在和密码错误返回相同的错误消息
  2. 限制错误详情:生产环境不暴露内部错误堆栈
  3. 速率限制:登录失败次数限制(未来实现)
  4. 审计日志:记录所有认证相关的操作

Testing Strategy

测试方法

本项目采用双重测试方法,结合单元测试和属性测试,确保全面的代码覆盖和正确性验证。

单元测试 (Unit Tests)

  • 验证特定示例和边界情况
  • 测试错误条件和异常处理
  • 测试集成点和组件交互
  • 使用 pytest 框架

属性测试 (Property-Based Tests)

  • 验证跨所有输入的通用属性
  • 通过随机化实现全面的输入覆盖
  • 使用 Hypothesis 库
  • 每个属性测试最少 100 次迭代

测试框架和工具

# requirements.txt
pytest==7.4.3
pytest-asyncio==0.21.1
hypothesis==6.92.1
faker==20.1.0  # 生成测试数据

测试文件组织

backend/test/
├── __init__.py
├── conftest.py                    # pytest fixtures
├── test_auth_service.py           # AuthService 单元测试
├── test_jwt_service.py            # JWTService 单元测试
├── test_auth_middleware.py        # 中间件单元测试
├── test_auth_router.py            # 路由集成测试
├── test_properties_auth.py        # 属性测试
└── test_integration_auth.py       # 端到端集成测试

测试配置

pytest.ini

[pytest]
testpaths = test
python_files = test_*.py
python_classes = Test*
python_functions = test_*
asyncio_mode = auto
markers =
    unit: Unit tests
    integration: Integration tests
    property: Property-based tests
    slow: Slow running tests

conftest.py (测试 Fixtures)

import pytest
import os
from database import init_database, get_db_connection
from fastapi.testclient import TestClient
from main import app

@pytest.fixture(scope="function")
def test_db():
    """Create a test database for each test."""
    # Use in-memory database for tests
    os.environ["DATABASE_PATH"] = ":memory:"
    init_database()
    yield
    # Cleanup handled by in-memory database

@pytest.fixture
def client(test_db):
    """Create a test client."""
    return TestClient(app)

@pytest.fixture
def sample_user_data():
    """Sample user data for testing."""
    return {
        "username": "testuser",
        "email": "test@example.com",
        "password": "TestPass123!"
    }

@pytest.fixture
def registered_user(client, sample_user_data):
    """Register a user and return the data."""
    response = client.post("/api/auth/register", json=sample_user_data)
    assert response.status_code == 201
    return sample_user_data

@pytest.fixture
def auth_token(client, registered_user):
    """Get authentication token for a registered user."""
    response = client.post("/api/auth/login", json={
        "username": registered_user["username"],
        "password": registered_user["password"]
    })
    assert response.status_code == 200
    return response.json()["access_token"]

单元测试示例

测试用户注册

# test/test_auth_service.py
import pytest
from services.auth_service import AuthService
from fastapi import HTTPException

@pytest.mark.unit
def test_register_user_success(test_db, sample_user_data):
    """Test successful user registration."""
    user = AuthService.register_user(
        username=sample_user_data["username"],
        email=sample_user_data["email"],
        password=sample_user_data["password"]
    )
    
    assert user.username == sample_user_data["username"]
    assert user.email == sample_user_data["email"]
    assert user.role == "annotator"
    assert user.password_hash != sample_user_data["password"]
    assert user.password_hash.startswith("$2b$")

@pytest.mark.unit
def test_register_duplicate_username(test_db, sample_user_data):
    """Test registration with duplicate username."""
    # Register first user
    AuthService.register_user(**sample_user_data)
    
    # Try to register with same username
    with pytest.raises(HTTPException) as exc_info:
        AuthService.register_user(
            username=sample_user_data["username"],
            email="different@example.com",
            password=sample_user_data["password"]
        )
    
    assert exc_info.value.status_code == 409
    assert "用户名已被使用" in exc_info.value.detail

@pytest.mark.unit
def test_register_short_password(test_db):
    """Test registration with password less than 8 characters."""
    # This should be caught by Pydantic validation
    # Test at router level
    pass

测试 JWT Token

# test/test_jwt_service.py
import pytest
import jwt
from datetime import datetime, timedelta
from services.jwt_service import JWTService
from config import settings

@pytest.mark.unit
def test_create_access_token():
    """Test access token creation."""
    user_data = {
        "id": "user_123",
        "username": "testuser",
        "email": "test@example.com",
        "role": "annotator"
    }
    
    token = JWTService.create_access_token(user_data)
    
    # Decode and verify
    payload = jwt.decode(
        token,
        settings.JWT_SECRET_KEY,
        algorithms=[settings.JWT_ALGORITHM]
    )
    
    assert payload["sub"] == user_data["id"]
    assert payload["username"] == user_data["username"]
    assert payload["email"] == user_data["email"]
    assert payload["role"] == user_data["role"]
    assert payload["type"] == "access"

@pytest.mark.unit
def test_verify_expired_token():
    """Test verification of expired token."""
    user_data = {"id": "user_123"}
    
    # Create token with past expiration
    expire = datetime.utcnow() - timedelta(minutes=1)
    payload = {
        "sub": user_data["id"],
        "exp": expire,
        "iat": datetime.utcnow(),
        "type": "access"
    }
    token = jwt.encode(
        payload,
        settings.JWT_SECRET_KEY,
        algorithm=settings.JWT_ALGORITHM
    )
    
    # Should raise ExpiredSignatureError
    with pytest.raises(jwt.ExpiredSignatureError):
        JWTService.verify_token(token, "access")

属性测试示例

使用 Hypothesis 进行属性测试

# test/test_properties_auth.py
import pytest
from hypothesis import given, strategies as st, settings
from services.auth_service import AuthService
from services.jwt_service import JWTService
import bcrypt
import jwt as pyjwt

# Custom strategies for generating test data
username_strategy = st.text(
    alphabet=st.characters(whitelist_categories=('Lu', 'Ll', 'Nd')),
    min_size=3,
    max_size=50
)

email_strategy = st.emails()

password_strategy = st.text(min_size=8, max_size=100)

@pytest.mark.property
@settings(max_examples=100)
@given(
    username=username_strategy,
    email=email_strategy,
    password=password_strategy
)
def test_property_password_hash_not_reversible(test_db, username, email, password):
    """
    Property 2: 密码哈希不可逆
    
    Feature: jwt-authentication, Property 2: 对于任何注册的用户,
    数据库中存储的 password_hash 字段应该不等于原始密码,
    且应该是有效的 bcrypt 哈希格式。
    """
    try:
        user = AuthService.register_user(username, email, password)
        
        # Password hash should not equal original password
        assert user.password_hash != password
        
        # Should be valid bcrypt format
        assert user.password_hash.startswith("$2b$")
        
        # Should be able to verify with bcrypt
        assert bcrypt.checkpw(
            password.encode('utf-8'),
            user.password_hash.encode('utf-8')
        )
    except Exception:
        # Skip if registration fails due to constraints
        pass

@pytest.mark.property
@settings(max_examples=100)
@given(password=st.text(min_size=0, max_size=7))
def test_property_short_passwords_rejected(test_db, password):
    """
    Property 3: 短密码被拒绝
    
    Feature: jwt-authentication, Property 3: 对于任何长度小于 8 个字符的密码字符串,
    注册请求应该返回 400 错误。
    """
    # This should be tested at router level with TestClient
    # since Pydantic validation happens at request parsing
    pass

@pytest.mark.property
@settings(max_examples=100)
@given(
    username=username_strategy,
    email=email_strategy,
    password=password_strategy
)
def test_property_new_user_default_role(test_db, username, email, password):
    """
    Property 4: 新用户默认角色
    
    Feature: jwt-authentication, Property 4: 对于任何新注册的用户,
    其角色字段应该自动设置为 "annotator"。
    """
    try:
        user = AuthService.register_user(username, email, password)
        assert user.role == "annotator"
    except Exception:
        # Skip if registration fails
        pass

@pytest.mark.property
@settings(max_examples=100)
@given(
    username=username_strategy,
    email=email_strategy,
    password=password_strategy
)
def test_property_login_returns_valid_tokens(test_db, username, email, password):
    """
    Property 5: 登录返回有效 Tokens
    
    Feature: jwt-authentication, Property 5: 对于任何已注册的用户,
    使用正确的用户名和密码登录应该返回有效的 JWT tokens。
    """
    try:
        # Register user
        AuthService.register_user(username, email, password)
        
        # Login
        result = AuthService.login_user(username, password)
        
        # Should have tokens
        assert "access_token" in result
        assert "refresh_token" in result
        assert "user" in result
        
        # Tokens should be valid JWT format
        access_payload = pyjwt.decode(
            result["access_token"],
            options={"verify_signature": False}
        )
        refresh_payload = pyjwt.decode(
            result["refresh_token"],
            options={"verify_signature": False}
        )
        
        assert access_payload["type"] == "access"
        assert refresh_payload["type"] == "refresh"
        
    except Exception:
        # Skip if registration or login fails
        pass

@pytest.mark.property
@settings(max_examples=100)
def test_property_access_token_contains_user_info(test_db, sample_user_data):
    """
    Property 6: Access Token 包含完整用户信息
    
    Feature: jwt-authentication, Property 6: 对于任何登录成功返回的 access_token,
    解码后的 payload 应该包含完整的用户信息。
    """
    # Register and login
    AuthService.register_user(**sample_user_data)
    result = AuthService.login_user(
        sample_user_data["username"],
        sample_user_data["password"]
    )
    
    # Decode token
    payload = pyjwt.decode(
        result["access_token"],
        options={"verify_signature": False}
    )
    
    # Should contain all required fields
    assert "sub" in payload  # user_id
    assert "username" in payload
    assert "email" in payload
    assert "role" in payload
    assert payload["username"] == sample_user_data["username"]
    assert payload["email"] == sample_user_data["email"]

@pytest.mark.property
@settings(max_examples=100)
def test_property_token_expiration_times(test_db, sample_user_data):
    """
    Property 7: Token 过期时间正确设置
    
    Feature: jwt-authentication, Property 7: 对于任何生成的 token,
    其过期时间应该符合配置的值。
    """
    from config import settings
    from datetime import datetime
    
    # Register and login
    AuthService.register_user(**sample_user_data)
    result = AuthService.login_user(
        sample_user_data["username"],
        sample_user_data["password"]
    )
    
    # Check access token expiration
    access_payload = pyjwt.decode(
        result["access_token"],
        options={"verify_signature": False}
    )
    access_exp = datetime.fromtimestamp(access_payload["exp"])
    access_iat = datetime.fromtimestamp(access_payload["iat"])
    access_diff = (access_exp - access_iat).total_seconds() / 60
    
    # Should be approximately ACCESS_TOKEN_EXPIRE_MINUTES
    assert abs(access_diff - settings.ACCESS_TOKEN_EXPIRE_MINUTES) < 1
    
    # Check refresh token expiration
    refresh_payload = pyjwt.decode(
        result["refresh_token"],
        options={"verify_signature": False}
    )
    refresh_exp = datetime.fromtimestamp(refresh_payload["exp"])
    refresh_iat = datetime.fromtimestamp(refresh_payload["iat"])
    refresh_diff = (refresh_exp - refresh_iat).total_seconds() / (60 * 60 * 24)
    
    # Should be approximately REFRESH_TOKEN_EXPIRE_DAYS
    assert abs(refresh_diff - settings.REFRESH_TOKEN_EXPIRE_DAYS) < 0.1

@pytest.mark.property
@settings(max_examples=100)
def test_property_token_refresh_rotation(test_db, sample_user_data):
    """
    Property 8: Token 刷新生成新令牌
    
    Feature: jwt-authentication, Property 8: 对于任何有效的 refresh_token,
    刷新应该返回新的 tokens,且新的 refresh_token 与旧的不同。
    """
    # Register and login
    AuthService.register_user(**sample_user_data)
    result1 = AuthService.login_user(
        sample_user_data["username"],
        sample_user_data["password"]
    )
    
    old_refresh_token = result1["refresh_token"]
    
    # Refresh tokens
    result2 = AuthService.refresh_tokens(old_refresh_token)
    
    new_access_token = result2["access_token"]
    new_refresh_token = result2["refresh_token"]
    
    # Should have new tokens
    assert new_access_token != result1["access_token"]
    assert new_refresh_token != old_refresh_token
    
    # New tokens should be valid
    access_payload = pyjwt.decode(
        new_access_token,
        options={"verify_signature": False}
    )
    assert access_payload["type"] == "access"

集成测试示例

# test/test_integration_auth.py
import pytest
from fastapi.testclient import TestClient

@pytest.mark.integration
def test_full_authentication_flow(client):
    """Test complete authentication flow from registration to protected endpoint access."""
    
    # 1. Register user
    register_data = {
        "username": "integrationuser",
        "email": "integration@example.com",
        "password": "IntegrationPass123!"
    }
    response = client.post("/api/auth/register", json=register_data)
    assert response.status_code == 201
    
    # 2. Login
    login_data = {
        "username": register_data["username"],
        "password": register_data["password"]
    }
    response = client.post("/api/auth/login", json=login_data)
    assert response.status_code == 200
    tokens = response.json()
    access_token = tokens["access_token"]
    refresh_token = tokens["refresh_token"]
    
    # 3. Access protected endpoint
    headers = {"Authorization": f"Bearer {access_token}"}
    response = client.get("/api/auth/me", headers=headers)
    assert response.status_code == 200
    user_info = response.json()
    assert user_info["username"] == register_data["username"]
    
    # 4. Access protected project endpoint
    response = client.get("/api/projects", headers=headers)
    assert response.status_code == 200
    
    # 5. Refresh tokens
    response = client.post(
        "/api/auth/refresh",
        json={"refresh_token": refresh_token}
    )
    assert response.status_code == 200
    new_tokens = response.json()
    assert new_tokens["access_token"] != access_token
    
    # 6. Use new token
    new_headers = {"Authorization": f"Bearer {new_tokens['access_token']}"}
    response = client.get("/api/auth/me", headers=new_headers)
    assert response.status_code == 200

@pytest.mark.integration
def test_middleware_protects_endpoints(client, auth_token):
    """
    Property 11: 现有 API 端点受保护
    
    Feature: jwt-authentication, Property 11: 对于任何受保护的端点,
    不带 token 的请求应该返回 401,带有效 token 的请求应该成功。
    """
    protected_endpoints = [
        "/api/projects",
        "/api/tasks",
        "/api/annotations"
    ]
    
    for endpoint in protected_endpoints:
        # Without token - should fail
        response = client.get(endpoint)
        assert response.status_code == 401
        
        # With token - should succeed
        headers = {"Authorization": f"Bearer {auth_token}"}
        response = client.get(endpoint, headers=headers)
        assert response.status_code == 200

@pytest.mark.integration
def test_public_endpoints_accessible(client):
    """Test that public endpoints don't require authentication."""
    public_endpoints = [
        "/",
        "/health",
        "/docs",
        "/openapi.json"
    ]
    
    for endpoint in public_endpoints:
        response = client.get(endpoint)
        assert response.status_code in [200, 404]  # 404 is ok if endpoint doesn't exist

测试覆盖率目标

  • 代码覆盖率: 最低 80%
  • 分支覆盖率: 最低 70%
  • 关键路径覆盖: 100%(认证、授权、token 生成)

运行测试

# 运行所有测试
pytest

# 运行单元测试
pytest -m unit

# 运行属性测试
pytest -m property

# 运行集成测试
pytest -m integration

# 生成覆盖率报告
pytest --cov=. --cov-report=html

# 运行特定测试文件
pytest test/test_auth_service.py

# 详细输出
pytest -v

# 显示打印输出
pytest -s

前端集成指南

React 前端实现概述

前端需要实现以下功能来与 JWT 认证系统集成:

  1. 登录/注册表单
  2. Token 存储和管理
  3. HTTP 请求拦截器(自动附加 token)
  4. Token 刷新逻辑
  5. 认证状态管理(使用 Jotai)
  6. 路由保护

1. 认证 Atoms (Jotai 状态管理)

// web/apps/lq_label/src/atoms/auth-atoms.ts
import { atom } from 'jotai';
import { atomWithStorage } from 'jotai/utils';

interface User {
  id: string;
  username: string;
  email: string;
  role: string;
  created_at: string;
}

interface AuthTokens {
  access_token: string;
  refresh_token: string;
}

// 使用 localStorage 持久化 tokens
export const authTokensAtom = atomWithStorage<AuthTokens | null>(
  'auth_tokens',
  null
);

// 当前用户信息
export const currentUserAtom = atomWithStorage<User | null>(
  'current_user',
  null
);

// 认证状态
export const isAuthenticatedAtom = atom(
  (get) => {
    const tokens = get(authTokensAtom);
    return tokens !== null && tokens.access_token !== '';
  }
);

// 登出 action
export const logoutAtom = atom(
  null,
  (get, set) => {
    set(authTokensAtom, null);
    set(currentUserAtom, null);
  }
);

2. 认证 API 服务

// web/apps/lq_label/src/services/auth-service.ts
import axios from 'axios';

const API_BASE_URL = process.env.REACT_APP_API_URL || 'http://localhost:8000';

export interface RegisterData {
  username: string;
  email: string;
  password: string;
}

export interface LoginData {
  username: string;
  password: string;
}

export interface AuthResponse {
  access_token: string;
  refresh_token: string;
  token_type: string;
  user: {
    id: string;
    username: string;
    email: string;
    role: string;
    created_at: string;
  };
}

export const authService = {
  async register(data: RegisterData): Promise<{ message: string; user_id: string }> {
    const response = await axios.post(`${API_BASE_URL}/api/auth/register`, data);
    return response.data;
  },

  async login(data: LoginData): Promise<AuthResponse> {
    const response = await axios.post(`${API_BASE_URL}/api/auth/login`, data);
    return response.data;
  },

  async refreshToken(refreshToken: string): Promise<AuthResponse> {
    const response = await axios.post(`${API_BASE_URL}/api/auth/refresh`, {
      refresh_token: refreshToken,
    });
    return response.data;
  },

  async getCurrentUser(accessToken: string): Promise<AuthResponse['user']> {
    const response = await axios.get(`${API_BASE_URL}/api/auth/me`, {
      headers: {
        Authorization: `Bearer ${accessToken}`,
      },
    });
    return response.data;
  },
};

3. Axios 拦截器配置

// web/apps/lq_label/src/utils/axios-config.ts
import axios, { AxiosError, InternalAxiosRequestConfig } from 'axios';
import { authTokensAtom, logoutAtom } from '../atoms/auth-atoms';
import { authService } from '../services/auth-service';
import { getDefaultStore } from 'jotai';

const API_BASE_URL = process.env.REACT_APP_API_URL || 'http://localhost:8000';

// 创建 axios 实例
export const apiClient = axios.create({
  baseURL: API_BASE_URL,
  headers: {
    'Content-Type': 'application/json',
  },
});

// Jotai store
const store = getDefaultStore();

// 请求拦截器:自动附加 access token
apiClient.interceptors.request.use(
  (config: InternalAxiosRequestConfig) => {
    const tokens = store.get(authTokensAtom);
    
    if (tokens?.access_token) {
      config.headers.Authorization = `Bearer ${tokens.access_token}`;
    }
    
    return config;
  },
  (error) => {
    return Promise.reject(error);
  }
);

// 响应拦截器:处理 token 过期
let isRefreshing = false;
let failedQueue: Array<{
  resolve: (value?: unknown) => void;
  reject: (reason?: unknown) => void;
}> = [];

const processQueue = (error: Error | null, token: string | null = null) => {
  failedQueue.forEach((prom) => {
    if (error) {
      prom.reject(error);
    } else {
      prom.resolve(token);
    }
  });
  
  failedQueue = [];
};

apiClient.interceptors.response.use(
  (response) => response,
  async (error: AxiosError) => {
    const originalRequest = error.config as InternalAxiosRequestConfig & {
      _retry?: boolean;
    };

    // 如果是 401 错误且不是登录/注册请求
    if (
      error.response?.status === 401 &&
      originalRequest &&
      !originalRequest._retry &&
      !originalRequest.url?.includes('/auth/login') &&
      !originalRequest.url?.includes('/auth/register')
    ) {
      if (isRefreshing) {
        // 如果正在刷新,将请求加入队列
        return new Promise((resolve, reject) => {
          failedQueue.push({ resolve, reject });
        })
          .then((token) => {
            originalRequest.headers.Authorization = `Bearer ${token}`;
            return apiClient(originalRequest);
          })
          .catch((err) => {
            return Promise.reject(err);
          });
      }

      originalRequest._retry = true;
      isRefreshing = true;

      const tokens = store.get(authTokensAtom);

      if (!tokens?.refresh_token) {
        // 没有 refresh token,直接登出
        store.set(logoutAtom);
        return Promise.reject(error);
      }

      try {
        // 尝试刷新 token
        const response = await authService.refreshToken(tokens.refresh_token);
        
        // 更新 tokens
        store.set(authTokensAtom, {
          access_token: response.access_token,
          refresh_token: response.refresh_token,
        });

        // 处理队列中的请求
        processQueue(null, response.access_token);

        // 重试原始请求
        originalRequest.headers.Authorization = `Bearer ${response.access_token}`;
        return apiClient(originalRequest);
      } catch (refreshError) {
        // 刷新失败,登出用户
        processQueue(refreshError as Error, null);
        store.set(logoutAtom);
        return Promise.reject(refreshError);
      } finally {
        isRefreshing = false;
      }
    }

    return Promise.reject(error);
  }
);

4. 登录组件

// web/apps/lq_label/src/components/login-form/login-form.tsx
import React, { useState } from 'react';
import { useAtom } from 'jotai';
import { useNavigate } from 'react-router-dom';
import { authTokensAtom, currentUserAtom } from '../../atoms/auth-atoms';
import { authService } from '../../services/auth-service';

export const LoginForm: React.FC = () => {
  const [, setTokens] = useAtom(authTokensAtom);
  const [, setCurrentUser] = useAtom(currentUserAtom);
  const navigate = useNavigate();

  const [formData, setFormData] = useState({
    username: '',
    password: '',
  });
  const [error, setError] = useState<string | null>(null);
  const [loading, setLoading] = useState(false);

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    setError(null);
    setLoading(true);

    try {
      const response = await authService.login(formData);
      
      // 保存 tokens 和用户信息
      setTokens({
        access_token: response.access_token,
        refresh_token: response.refresh_token,
      });
      setCurrentUser(response.user);

      // 跳转到主页
      navigate('/');
    } catch (err: any) {
      setError(err.response?.data?.detail || '登录失败,请重试');
    } finally {
      setLoading(false);
    }
  };

  return (
    <form onSubmit={handleSubmit} className="space-y-comfortable">
      <div>
        <label htmlFor="username" className="text-body-medium">
          用户名
        </label>
        <input
          id="username"
          type="text"
          value={formData.username}
          onChange={(e) => setFormData({ ...formData, username: e.target.value })}
          required
          className="w-full p-comfortable border rounded"
        />
      </div>

      <div>
        <label htmlFor="password" className="text-body-medium">
          密码
        </label>
        <input
          id="password"
          type="password"
          value={formData.password}
          onChange={(e) => setFormData({ ...formData, password: e.target.value })}
          required
          className="w-full p-comfortable border rounded"
        />
      </div>

      {error && (
        <div className="text-error-foreground bg-error-background p-comfortable rounded">
          {error}
        </div>
      )}

      <button
        type="submit"
        disabled={loading}
        className="w-full bg-primary-background text-primary-foreground p-comfortable rounded"
      >
        {loading ? '登录中...' : '登录'}
      </button>
    </form>
  );
};

5. 路由保护

// web/apps/lq_label/src/components/protected-route/protected-route.tsx
import React from 'react';
import { Navigate } from 'react-router-dom';
import { useAtomValue } from 'jotai';
import { isAuthenticatedAtom } from '../../atoms/auth-atoms';

interface ProtectedRouteProps {
  children: React.ReactNode;
}

export const ProtectedRoute: React.FC<ProtectedRouteProps> = ({ children }) => {
  const isAuthenticated = useAtomValue(isAuthenticatedAtom);

  if (!isAuthenticated) {
    return <Navigate to="/login" replace />;
  }

  return <>{children}</>;
};

6. 应用路由配置

// web/apps/lq_label/src/app/app.tsx
import { BrowserRouter, Routes, Route } from 'react-router-dom';
import { LoginForm } from '../components/login-form/login-form';
import { RegisterForm } from '../components/register-form/register-form';
import { ProtectedRoute } from '../components/protected-route/protected-route';
import { ProjectList } from '../views/project-list/project-list';

export function App() {
  return (
    <BrowserRouter>
      <Routes>
        <Route path="/login" element={<LoginForm />} />
        <Route path="/register" element={<RegisterForm />} />
        
        <Route
          path="/"
          element={
            <ProtectedRoute>
              <ProjectList />
            </ProtectedRoute>
          }
        />
        
        {/* 其他受保护的路由 */}
      </Routes>
    </BrowserRouter>
  );
}

7. 环境变量配置

# web/apps/lq_label/.env
REACT_APP_API_URL=http://localhost:8000

前端集成检查清单

  • 安装依赖:axios, jotai, jotai/utils, react-router-dom
  • 创建认证 atoms(authTokensAtom, currentUserAtom)
  • 实现认证服务(authService)
  • 配置 axios 拦截器(自动附加 token,处理刷新)
  • 创建登录/注册表单组件
  • 实现路由保护组件(ProtectedRoute)
  • 配置应用路由
  • 设置环境变量
  • 测试完整认证流程