# Design Document ## Overview 本文档描述了标注平台 JWT 权限认证体系的详细设计。该系统采用 FastAPI + JWT 的标准认证方案,使用中间件实现全局鉴权,并为未来的 OAuth 集成预留扩展接口。 ### 设计目标 1. **安全性**:使用 bcrypt 加密密码,JWT 签名验证,防止未授权访问 2. **可扩展性**:服务层架构,便于未来集成 OAuth 认证中心 3. **易用性**:标准的 RESTful API,清晰的错误消息 4. **性能**:中间件高效验证,数据库索引优化 5. **兼容性**:与现有 API 无缝集成,最小化代码改动 ### 技术栈 - **FastAPI**: Web 框架 - **PyJWT**: JWT token 生成和验证 - **bcrypt**: 密码哈希加密 - **SQLite**: 数据库(与现有系统一致) - **Pydantic**: 数据验证和序列化 ## Architecture ### 系统架构图 ``` ┌─────────────────────────────────────────────────────────────┐ │ Frontend (React) │ │ - Login Form │ │ - Token Storage (localStorage) │ │ - HTTP Interceptor (Auto attach token) │ └────────────────────────┬────────────────────────────────────┘ │ HTTP Request (Authorization: Bearer ) ▼ ┌─────────────────────────────────────────────────────────────┐ │ FastAPI Application │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ Auth Middleware │ │ │ │ - Extract JWT from header │ │ │ │ - Verify signature & expiration │ │ │ │ - Attach user info to request │ │ │ │ - Skip public endpoints │ │ │ └───────────────────┬───────────────────────────────────┘ │ │ │ │ │ ┌───────────────────▼───────────────────────────────────┐ │ │ │ Router Layer │ │ │ │ - /api/auth/* (Auth Router) │ │ │ │ - /api/projects/* (Protected) │ │ │ │ - /api/tasks/* (Protected) │ │ │ │ - /api/annotations/* (Protected) │ │ │ └───────────────────┬───────────────────────────────────┘ │ │ │ │ │ ┌───────────────────▼───────────────────────────────────┐ │ │ │ Service Layer │ │ │ │ - AuthService (login, register, token) │ │ │ │ - UserService (user CRUD) │ │ │ │ - JWTService (token generation & validation) │ │ │ └───────────────────┬───────────────────────────────────┘ │ │ │ │ │ ┌───────────────────▼───────────────────────────────────┐ │ │ │ Data Layer │ │ │ │ - User Model │ │ │ │ - Database Connection │ │ │ └───────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────────┐ │ SQLite DB │ │ - users │ └──────────────┘ ``` ### 认证流程 #### 1. 用户注册流程 ``` User → POST /api/auth/register → AuthService.register() → Validate input (username, email, password) → Check username/email uniqueness → Hash password with bcrypt → Create user in database → Return success message ``` #### 2. 用户登录流程 ``` User → POST /api/auth/login → AuthService.login() → Validate credentials → Verify password with bcrypt → Generate Access Token (15 min) → Generate Refresh Token (7 days) → Return tokens + user info ``` #### 3. Token 刷新流程 ``` User → POST /api/auth/refresh → AuthService.refresh_token() → Verify Refresh Token → Extract user info → Generate new Access Token → Generate new Refresh Token (rotation) → Return new tokens ``` #### 4. 受保护端点访问流程 ``` User → GET /api/projects (with Authorization header) → Auth Middleware → Extract token from header → Verify JWT signature → Check expiration → Extract user info → Attach to request.state.user → Continue to route handler → Route handler uses request.state.user ``` ## Components and Interfaces ### 1. 数据库模型 #### User Model (models.py) ```python class User: """User model representing a system user.""" def __init__( self, id: str, username: str, email: str, password_hash: str, role: str, oauth_provider: Optional[str], oauth_id: Optional[str], created_at: datetime ): self.id = id self.username = username self.email = email self.password_hash = password_hash self.role = role self.oauth_provider = oauth_provider # 预留 OAuth self.oauth_id = oauth_id # 预留 OAuth self.created_at = created_at @classmethod def from_row(cls, row): """Create User instance from database row.""" return cls( id=row["id"], username=row["username"], email=row["email"], password_hash=row["password_hash"], role=row["role"], oauth_provider=row.get("oauth_provider"), oauth_id=row.get("oauth_id"), created_at=row["created_at"] ) ``` ### 2. Pydantic Schemas #### Auth Schemas (schemas/auth.py) ```python from pydantic import BaseModel, EmailStr, Field from typing import Optional from datetime import datetime class UserRegister(BaseModel): """User registration request schema.""" username: str = Field(..., min_length=3, max_length=50) email: EmailStr password: str = Field(..., min_length=8, max_length=100) class UserLogin(BaseModel): """User login request schema.""" username: str password: str class TokenResponse(BaseModel): """Token response schema.""" access_token: str refresh_token: str token_type: str = "bearer" user: "UserResponse" class TokenRefresh(BaseModel): """Token refresh request schema.""" refresh_token: str class UserResponse(BaseModel): """User response schema.""" id: str username: str email: str role: str created_at: datetime class TokenPayload(BaseModel): """JWT token payload schema.""" sub: str # user_id username: str email: str role: str exp: datetime iat: datetime ``` ### 3. 服务层 #### JWT Service (services/jwt_service.py) ```python from datetime import datetime, timedelta from typing import Dict, Optional import jwt from config import settings class JWTService: """Service for JWT token operations.""" @staticmethod def create_access_token(user_data: Dict) -> str: """ Create access token with 15 minutes expiration. Args: user_data: Dict containing user_id, username, email, role Returns: Encoded JWT token string """ expire = datetime.utcnow() + timedelta( minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES ) payload = { "sub": user_data["id"], "username": user_data["username"], "email": user_data["email"], "role": user_data["role"], "exp": expire, "iat": datetime.utcnow(), "type": "access" } return jwt.encode( payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM ) @staticmethod def create_refresh_token(user_data: Dict) -> str: """ Create refresh token with 7 days expiration. Args: user_data: Dict containing user_id Returns: Encoded JWT token string """ expire = datetime.utcnow() + timedelta( days=settings.REFRESH_TOKEN_EXPIRE_DAYS ) payload = { "sub": user_data["id"], "exp": expire, "iat": datetime.utcnow(), "type": "refresh" } return jwt.encode( payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM ) @staticmethod def verify_token(token: str, token_type: str = "access") -> Optional[Dict]: """ Verify and decode JWT token. Args: token: JWT token string token_type: Expected token type (access or refresh) Returns: Decoded payload dict or None if invalid Raises: jwt.ExpiredSignatureError: Token has expired jwt.InvalidTokenError: Token is invalid """ try: payload = jwt.decode( token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM] ) # Verify token type if payload.get("type") != token_type: return None return payload except jwt.ExpiredSignatureError: raise except jwt.InvalidTokenError: raise ``` #### Auth Service (services/auth_service.py) ```python import bcrypt import uuid from typing import Optional, Dict from database import get_db_connection from models import User from services.jwt_service import JWTService from fastapi import HTTPException, status class AuthService: """Service for authentication operations.""" @staticmethod def register_user(username: str, email: str, password: str) -> User: """ Register a new user. Args: username: Unique username email: Unique email address password: Plain text password Returns: Created User object Raises: HTTPException: 409 if username or email already exists """ with get_db_connection() as conn: cursor = conn.cursor() # Check username uniqueness cursor.execute( "SELECT id FROM users WHERE username = ?", (username,) ) if cursor.fetchone(): raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="用户名已被使用" ) # Check email uniqueness cursor.execute( "SELECT id FROM users WHERE email = ?", (email,) ) if cursor.fetchone(): raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="邮箱已被使用" ) # Hash password password_hash = bcrypt.hashpw( password.encode('utf-8'), bcrypt.gensalt() ).decode('utf-8') # Create user user_id = f"user_{uuid.uuid4().hex[:12]}" cursor.execute(""" INSERT INTO users ( id, username, email, password_hash, role ) VALUES (?, ?, ?, ?, ?) """, (user_id, username, email, password_hash, "annotator")) # Fetch created user cursor.execute( "SELECT * FROM users WHERE id = ?", (user_id,) ) row = cursor.fetchone() return User.from_row(row) @staticmethod def login_user(username: str, password: str) -> Dict: """ Authenticate user and generate tokens. Args: username: Username password: Plain text password Returns: Dict containing access_token, refresh_token, and user info Raises: HTTPException: 401 if credentials are invalid """ with get_db_connection() as conn: cursor = conn.cursor() # Find user cursor.execute( "SELECT * FROM users WHERE username = ?", (username,) ) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误" ) user = User.from_row(row) # Verify password if not bcrypt.checkpw( password.encode('utf-8'), user.password_hash.encode('utf-8') ): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误" ) # Generate tokens user_data = { "id": user.id, "username": user.username, "email": user.email, "role": user.role } access_token = JWTService.create_access_token(user_data) refresh_token = JWTService.create_refresh_token(user_data) return { "access_token": access_token, "refresh_token": refresh_token, "user": user_data } @staticmethod def refresh_tokens(refresh_token: str) -> Dict: """ Refresh access token using refresh token. Args: refresh_token: Valid refresh token Returns: Dict containing new access_token and refresh_token Raises: HTTPException: 401 if refresh token is invalid or expired """ try: payload = JWTService.verify_token(refresh_token, "refresh") if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的刷新令牌" ) user_id = payload["sub"] # Fetch user from database with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM users WHERE id = ?", (user_id,) ) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在" ) user = User.from_row(row) user_data = { "id": user.id, "username": user.username, "email": user.email, "role": user.role } # Generate new tokens (token rotation) new_access_token = JWTService.create_access_token(user_data) new_refresh_token = JWTService.create_refresh_token(user_data) return { "access_token": new_access_token, "refresh_token": new_refresh_token, "user": user_data } except Exception as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="刷新令牌已过期或无效,请重新登录" ) @staticmethod def get_current_user(user_id: str) -> User: """ Get user by ID. Args: user_id: User unique identifier Returns: User object Raises: HTTPException: 404 if user not found """ with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM users WHERE id = ?", (user_id,) ) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在" ) return User.from_row(row) ``` ### 4. 中间件 #### Auth Middleware (middleware/auth_middleware.py) ```python from fastapi import Request, HTTPException, status from fastapi.responses import JSONResponse from starlette.middleware.base import BaseHTTPMiddleware from services.jwt_service import JWTService import jwt class AuthMiddleware(BaseHTTPMiddleware): """ Authentication middleware for JWT token verification. Validates JWT tokens and attaches user info to request state. """ # Public endpoints that don't require authentication PUBLIC_PATHS = { "/", "/health", "/docs", "/openapi.json", "/redoc", "/api/auth/register", "/api/auth/login", "/api/auth/refresh", } async def dispatch(self, request: Request, call_next): """ Process each request through authentication. Args: request: FastAPI Request object call_next: Next middleware or route handler Returns: Response from next handler or error response """ # Skip authentication for public paths if request.url.path in self.PUBLIC_PATHS: return await call_next(request) # Skip authentication for OPTIONS requests (CORS preflight) if request.method == "OPTIONS": return await call_next(request) # Extract token from Authorization header auth_header = request.headers.get("Authorization") if not auth_header: return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ "detail": "缺少认证令牌", "error_type": "missing_token" } ) # Verify Bearer token format parts = auth_header.split() if len(parts) != 2 or parts[0].lower() != "bearer": return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ "detail": "无效的认证令牌格式", "error_type": "invalid_token_format" } ) token = parts[1] try: # Verify and decode token payload = JWTService.verify_token(token, "access") if not payload: return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ "detail": "无效的认证令牌", "error_type": "invalid_token" } ) # Attach user info to request state request.state.user = { "id": payload["sub"], "username": payload["username"], "email": payload["email"], "role": payload["role"] } # Continue to route handler response = await call_next(request) return response except jwt.ExpiredSignatureError: return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ "detail": "认证令牌已过期", "error_type": "token_expired" } ) except jwt.InvalidTokenError: return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ "detail": "无效的认证令牌", "error_type": "invalid_token" } ) except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "detail": "认证过程发生错误", "error_type": "auth_error" } ) def require_role(*allowed_roles: str): """ Decorator to check user role. Usage: @require_role("admin", "annotator") async def my_endpoint(request: Request): ... Args: allowed_roles: Tuple of allowed role names Returns: Decorator function """ def decorator(func): async def wrapper(request: Request, *args, **kwargs): user = getattr(request.state, "user", None) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="未认证" ) if user["role"] not in allowed_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="权限不足" ) return await func(request, *args, **kwargs) return wrapper return decorator ``` ### 5. 路由 #### Auth Router (routers/auth.py) ```python from fastapi import APIRouter, HTTPException, status, Request from schemas.auth import ( UserRegister, UserLogin, TokenResponse, TokenRefresh, UserResponse ) from services.auth_service import AuthService router = APIRouter( prefix="/api/auth", tags=["authentication"] ) @router.post("/register", status_code=status.HTTP_201_CREATED) async def register(user_data: UserRegister): """ Register a new user. Args: user_data: User registration data Returns: Success message Raises: HTTPException: 409 if username or email already exists HTTPException: 400 if validation fails """ user = AuthService.register_user( username=user_data.username, email=user_data.email, password=user_data.password ) return { "message": "用户注册成功", "user_id": user.id } @router.post("/login", response_model=TokenResponse) async def login(credentials: UserLogin): """ Authenticate user and return JWT tokens. Args: credentials: User login credentials Returns: Access token, refresh token, and user info Raises: HTTPException: 401 if credentials are invalid """ result = AuthService.login_user( username=credentials.username, password=credentials.password ) return TokenResponse( access_token=result["access_token"], refresh_token=result["refresh_token"], user=UserResponse(**result["user"]) ) @router.post("/refresh", response_model=TokenResponse) async def refresh_token(token_data: TokenRefresh): """ Refresh access token using refresh token. Args: token_data: Refresh token Returns: New access token and refresh token Raises: HTTPException: 401 if refresh token is invalid or expired """ result = AuthService.refresh_tokens(token_data.refresh_token) return TokenResponse( access_token=result["access_token"], refresh_token=result["refresh_token"], user=UserResponse(**result["user"]) ) @router.get("/me", response_model=UserResponse) async def get_current_user(request: Request): """ Get current authenticated user info. Args: request: FastAPI Request with user info in state Returns: Current user information Raises: HTTPException: 401 if not authenticated """ user_data = getattr(request.state, "user", None) if not user_data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="未认证" ) user = AuthService.get_current_user(user_data["id"]) return UserResponse( id=user.id, username=user.username, email=user.email, role=user.role, created_at=user.created_at ) ``` ### 6. 配置 #### Config (config.py) ```python import os import secrets from pydantic_settings import BaseSettings class Settings(BaseSettings): """Application settings.""" # JWT Settings JWT_SECRET_KEY: str = os.getenv( "JWT_SECRET_KEY", secrets.token_urlsafe(32) ) JWT_ALGORITHM: str = os.getenv("JWT_ALGORITHM", "HS256") ACCESS_TOKEN_EXPIRE_MINUTES: int = int( os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "15") ) REFRESH_TOKEN_EXPIRE_DAYS: int = int( os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", "7") ) # Database Settings DATABASE_PATH: str = os.getenv( "DATABASE_PATH", "annotation_platform.db" ) # OAuth Settings (预留) OAUTH_ENABLED: bool = os.getenv("OAUTH_ENABLED", "false").lower() == "true" OAUTH_PROVIDER_URL: str = os.getenv("OAUTH_PROVIDER_URL", "") OAUTH_CLIENT_ID: str = os.getenv("OAUTH_CLIENT_ID", "") OAUTH_CLIENT_SECRET: str = os.getenv("OAUTH_CLIENT_SECRET", "") class Config: env_file = ".env" case_sensitive = True settings = Settings() # Warn if using default JWT secret if settings.JWT_SECRET_KEY == secrets.token_urlsafe(32): import logging logging.warning( "使用默认生成的 JWT_SECRET_KEY,生产环境请设置环境变量" ) ``` ## Data Models ### Database Schema #### Users Table ```sql CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, username TEXT NOT NULL UNIQUE, email TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'annotator', oauth_provider TEXT, -- 预留: 'google', 'github', etc. oauth_id TEXT, -- 预留: OAuth provider user ID created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Indexes for performance CREATE INDEX IF NOT EXISTS idx_users_username ON users(username); CREATE INDEX IF NOT EXISTS idx_users_email ON users(email); CREATE INDEX IF NOT EXISTS idx_users_oauth ON users(oauth_provider, oauth_id); ``` #### 字段说明 | 字段 | 类型 | 说明 | 约束 | |------|------|------|------| | id | TEXT | 用户唯一标识 | PRIMARY KEY, 格式: user_xxxxx | | username | TEXT | 用户名 | NOT NULL, UNIQUE, 3-50字符 | | email | TEXT | 邮箱地址 | NOT NULL, UNIQUE, 有效邮箱格式 | | password_hash | TEXT | bcrypt 加密的密码 | NOT NULL | | role | TEXT | 用户角色 | NOT NULL, 枚举: admin/annotator/viewer | | oauth_provider | TEXT | OAuth 提供商 | NULLABLE, 预留字段 | | oauth_id | TEXT | OAuth 用户 ID | NULLABLE, 预留字段 | | created_at | TIMESTAMP | 创建时间 | DEFAULT CURRENT_TIMESTAMP | | updated_at | TIMESTAMP | 更新时间 | DEFAULT CURRENT_TIMESTAMP | ### JWT Token Structure #### Access Token Payload ```json { "sub": "user_abc123def456", "username": "john_doe", "email": "john@example.com", "role": "annotator", "type": "access", "iat": 1704067200, "exp": 1704068100 } ``` #### Refresh Token Payload ```json { "sub": "user_abc123def456", "type": "refresh", "iat": 1704067200, "exp": 1704672000 } ``` ### API Request/Response Examples #### 注册请求 ```json POST /api/auth/register Content-Type: application/json { "username": "john_doe", "email": "john@example.com", "password": "SecurePass123!" } ``` #### 注册响应 ```json HTTP/1.1 201 Created Content-Type: application/json { "message": "用户注册成功", "user_id": "user_abc123def456" } ``` #### 登录请求 ```json POST /api/auth/login Content-Type: application/json { "username": "john_doe", "password": "SecurePass123!" } ``` #### 登录响应 ```json HTTP/1.1 200 OK Content-Type: application/json { "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "token_type": "bearer", "user": { "id": "user_abc123def456", "username": "john_doe", "email": "john@example.com", "role": "annotator", "created_at": "2024-01-01T00:00:00Z" } } ``` #### 受保护端点请求 ```http GET /api/projects Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9... ``` ## Correctness Properties *属性(Property)是系统在所有有效执行中应该保持为真的特征或行为——本质上是关于系统应该做什么的形式化陈述。属性作为人类可读规范和机器可验证正确性保证之间的桥梁。* ### Property 1: 用户注册创建有效用户 *对于任何*有效的用户名、邮箱和密码组合,注册后应该在数据库中创建一个新用户记录,且该记录包含正确的用户名、邮箱和加密后的密码。 **Validates: Requirements 1.1, 1.5** ### Property 2: 密码哈希不可逆 *对于任何*注册的用户,数据库中存储的 password_hash 字段应该不等于原始密码,且应该是有效的 bcrypt 哈希格式(以 $2b$ 开头)。 **Validates: Requirements 1.5, 7.1** ### Property 3: 短密码被拒绝 *对于任何*长度小于 8 个字符的密码字符串,注册请求应该返回 400 错误,且不应该在数据库中创建用户记录。 **Validates: Requirements 1.4** ### Property 4: 新用户默认角色 *对于任何*新注册的用户,其角色字段应该自动设置为 "annotator"。 **Validates: Requirements 1.6** ### Property 5: 登录返回有效 Tokens *对于任何*已注册的用户,使用正确的用户名和密码登录应该返回包含 access_token、refresh_token 和 user 信息的响应,且两个 token 都应该是有效的 JWT 格式。 **Validates: Requirements 2.1, 2.6** ### Property 6: Access Token 包含完整用户信息 *对于任何*登录成功返回的 access_token,解码后的 payload 应该包含 sub(用户ID)、username、email 和 role 字段。 **Validates: Requirements 2.3** ### Property 7: Token 过期时间正确设置 *对于任何*生成的 access_token,其过期时间(exp)与签发时间(iat)的差值应该等于配置的 ACCESS_TOKEN_EXPIRE_MINUTES;对于任何 refresh_token,差值应该等于配置的 REFRESH_TOKEN_EXPIRE_DAYS。 **Validates: Requirements 2.4, 2.5** ### Property 8: Token 刷新生成新令牌 *对于任何*有效的 refresh_token,使用它刷新应该返回新的 access_token 和 refresh_token,且新的 refresh_token 应该与旧的不同(token rotation)。 **Validates: Requirements 3.1, 3.4** ### Property 9: 中间件验证有效 Token *对于任何*受保护的端点和有效的 JWT access_token,请求应该成功通过中间件验证,且 request.state.user 应该包含正确的用户信息(id、username、email、role)。 **Validates: Requirements 4.1, 4.5** ### Property 10: 用户信息端点返回正确数据 *对于任何*已认证的用户,请求 /api/auth/me 端点应该返回该用户的完整信息(id、username、email、role、created_at),且信息应该与数据库中的记录一致。 **Validates: Requirements 6.1** ### Property 11: 现有 API 端点受保护 *对于任何* /api/projects、/api/tasks 或 /api/annotations 端点,不带有效 token 的请求应该返回 401 错误,带有效 token 的请求应该成功处理。 **Validates: Requirements 11.1, 11.2, 11.3** ### Property 12: 角色权限控制 *对于任何*需要特定角色的端点,只有具有允许角色的用户才能成功访问,其他角色的用户应该收到 403 错误。 **Validates: Requirements 12.2, 12.3, 12.4** ## Error Handling ### 错误响应格式 所有错误响应遵循统一的 JSON 格式: ```json { "detail": "错误描述信息", "error_type": "error_category" } ``` ### 错误类型分类 #### 1. 认证错误 (401 Unauthorized) | Error Type | Detail | 场景 | |------------|--------|------| | missing_token | 缺少认证令牌 | Authorization header 不存在 | | invalid_token_format | 无效的认证令牌格式 | 不是 "Bearer " 格式 | | invalid_token | 无效的认证令牌 | JWT 签名验证失败 | | token_expired | 认证令牌已过期 | JWT 过期时间已过 | | invalid_credentials | 用户名或密码错误 | 登录凭据不正确 | | user_not_found | 用户不存在 | 刷新 token 时用户已被删除 | #### 2. 授权错误 (403 Forbidden) | Error Type | Detail | 场景 | |------------|--------|------| | insufficient_permissions | 权限不足 | 用户角色不满足端点要求 | #### 3. 客户端错误 (400 Bad Request) | Error Type | Detail | 场景 | |------------|--------|------| | validation_error | 输入验证失败 | Pydantic 验证失败 | | password_too_short | 密码长度不足 | 密码少于 8 个字符 | | invalid_email | 无效的邮箱格式 | 邮箱格式不正确 | #### 4. 冲突错误 (409 Conflict) | Error Type | Detail | 场景 | |------------|--------|------| | username_exists | 用户名已被使用 | 注册时用户名重复 | | email_exists | 邮箱已被使用 | 注册时邮箱重复 | #### 5. 服务器错误 (500 Internal Server Error) | Error Type | Detail | 场景 | |------------|--------|------| | auth_error | 认证过程发生错误 | 中间件内部错误 | | database_error | 数据库操作失败 | 数据库连接或查询错误 | | internal_error | 服务器内部错误 | 未预期的异常 | ### 错误处理策略 #### 1. 日志记录 ```python import logging logger = logging.getLogger(__name__) # 记录认证失败(不记录密码) logger.warning( f"Login failed for username: {username}", extra={"username": username, "ip": request.client.host} ) # 记录服务器错误 logger.error( f"Authentication error: {str(e)}", exc_info=True, extra={"user_id": user_id} ) ``` #### 2. 异常处理层次 ``` FastAPI Exception Handlers ↓ Custom Exception Classes ↓ Service Layer Exceptions ↓ Database Exceptions ``` #### 3. 自定义异常类 ```python class AuthenticationError(Exception): """Base authentication exception.""" pass class InvalidCredentialsError(AuthenticationError): """Invalid username or password.""" pass class TokenExpiredError(AuthenticationError): """JWT token has expired.""" pass class InsufficientPermissionsError(AuthenticationError): """User lacks required permissions.""" pass ``` ### 安全考虑 1. **密码错误不泄露信息**:用户名不存在和密码错误返回相同的错误消息 2. **限制错误详情**:生产环境不暴露内部错误堆栈 3. **速率限制**:登录失败次数限制(未来实现) 4. **审计日志**:记录所有认证相关的操作 ## Testing Strategy ### 测试方法 本项目采用**双重测试方法**,结合单元测试和属性测试,确保全面的代码覆盖和正确性验证。 #### 单元测试 (Unit Tests) - 验证特定示例和边界情况 - 测试错误条件和异常处理 - 测试集成点和组件交互 - 使用 pytest 框架 #### 属性测试 (Property-Based Tests) - 验证跨所有输入的通用属性 - 通过随机化实现全面的输入覆盖 - 使用 Hypothesis 库 - 每个属性测试最少 100 次迭代 ### 测试框架和工具 ```python # requirements.txt pytest==7.4.3 pytest-asyncio==0.21.1 hypothesis==6.92.1 faker==20.1.0 # 生成测试数据 ``` ### 测试文件组织 ``` backend/test/ ├── __init__.py ├── conftest.py # pytest fixtures ├── test_auth_service.py # AuthService 单元测试 ├── test_jwt_service.py # JWTService 单元测试 ├── test_auth_middleware.py # 中间件单元测试 ├── test_auth_router.py # 路由集成测试 ├── test_properties_auth.py # 属性测试 └── test_integration_auth.py # 端到端集成测试 ``` ### 测试配置 #### pytest.ini ```ini [pytest] testpaths = test python_files = test_*.py python_classes = Test* python_functions = test_* asyncio_mode = auto markers = unit: Unit tests integration: Integration tests property: Property-based tests slow: Slow running tests ``` #### conftest.py (测试 Fixtures) ```python import pytest import os from database import init_database, get_db_connection from fastapi.testclient import TestClient from main import app @pytest.fixture(scope="function") def test_db(): """Create a test database for each test.""" # Use in-memory database for tests os.environ["DATABASE_PATH"] = ":memory:" init_database() yield # Cleanup handled by in-memory database @pytest.fixture def client(test_db): """Create a test client.""" return TestClient(app) @pytest.fixture def sample_user_data(): """Sample user data for testing.""" return { "username": "testuser", "email": "test@example.com", "password": "TestPass123!" } @pytest.fixture def registered_user(client, sample_user_data): """Register a user and return the data.""" response = client.post("/api/auth/register", json=sample_user_data) assert response.status_code == 201 return sample_user_data @pytest.fixture def auth_token(client, registered_user): """Get authentication token for a registered user.""" response = client.post("/api/auth/login", json={ "username": registered_user["username"], "password": registered_user["password"] }) assert response.status_code == 200 return response.json()["access_token"] ``` ### 单元测试示例 #### 测试用户注册 ```python # test/test_auth_service.py import pytest from services.auth_service import AuthService from fastapi import HTTPException @pytest.mark.unit def test_register_user_success(test_db, sample_user_data): """Test successful user registration.""" user = AuthService.register_user( username=sample_user_data["username"], email=sample_user_data["email"], password=sample_user_data["password"] ) assert user.username == sample_user_data["username"] assert user.email == sample_user_data["email"] assert user.role == "annotator" assert user.password_hash != sample_user_data["password"] assert user.password_hash.startswith("$2b$") @pytest.mark.unit def test_register_duplicate_username(test_db, sample_user_data): """Test registration with duplicate username.""" # Register first user AuthService.register_user(**sample_user_data) # Try to register with same username with pytest.raises(HTTPException) as exc_info: AuthService.register_user( username=sample_user_data["username"], email="different@example.com", password=sample_user_data["password"] ) assert exc_info.value.status_code == 409 assert "用户名已被使用" in exc_info.value.detail @pytest.mark.unit def test_register_short_password(test_db): """Test registration with password less than 8 characters.""" # This should be caught by Pydantic validation # Test at router level pass ``` #### 测试 JWT Token ```python # test/test_jwt_service.py import pytest import jwt from datetime import datetime, timedelta from services.jwt_service import JWTService from config import settings @pytest.mark.unit def test_create_access_token(): """Test access token creation.""" user_data = { "id": "user_123", "username": "testuser", "email": "test@example.com", "role": "annotator" } token = JWTService.create_access_token(user_data) # Decode and verify payload = jwt.decode( token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM] ) assert payload["sub"] == user_data["id"] assert payload["username"] == user_data["username"] assert payload["email"] == user_data["email"] assert payload["role"] == user_data["role"] assert payload["type"] == "access" @pytest.mark.unit def test_verify_expired_token(): """Test verification of expired token.""" user_data = {"id": "user_123"} # Create token with past expiration expire = datetime.utcnow() - timedelta(minutes=1) payload = { "sub": user_data["id"], "exp": expire, "iat": datetime.utcnow(), "type": "access" } token = jwt.encode( payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM ) # Should raise ExpiredSignatureError with pytest.raises(jwt.ExpiredSignatureError): JWTService.verify_token(token, "access") ``` ### 属性测试示例 #### 使用 Hypothesis 进行属性测试 ```python # test/test_properties_auth.py import pytest from hypothesis import given, strategies as st, settings from services.auth_service import AuthService from services.jwt_service import JWTService import bcrypt import jwt as pyjwt # Custom strategies for generating test data username_strategy = st.text( alphabet=st.characters(whitelist_categories=('Lu', 'Ll', 'Nd')), min_size=3, max_size=50 ) email_strategy = st.emails() password_strategy = st.text(min_size=8, max_size=100) @pytest.mark.property @settings(max_examples=100) @given( username=username_strategy, email=email_strategy, password=password_strategy ) def test_property_password_hash_not_reversible(test_db, username, email, password): """ Property 2: 密码哈希不可逆 Feature: jwt-authentication, Property 2: 对于任何注册的用户, 数据库中存储的 password_hash 字段应该不等于原始密码, 且应该是有效的 bcrypt 哈希格式。 """ try: user = AuthService.register_user(username, email, password) # Password hash should not equal original password assert user.password_hash != password # Should be valid bcrypt format assert user.password_hash.startswith("$2b$") # Should be able to verify with bcrypt assert bcrypt.checkpw( password.encode('utf-8'), user.password_hash.encode('utf-8') ) except Exception: # Skip if registration fails due to constraints pass @pytest.mark.property @settings(max_examples=100) @given(password=st.text(min_size=0, max_size=7)) def test_property_short_passwords_rejected(test_db, password): """ Property 3: 短密码被拒绝 Feature: jwt-authentication, Property 3: 对于任何长度小于 8 个字符的密码字符串, 注册请求应该返回 400 错误。 """ # This should be tested at router level with TestClient # since Pydantic validation happens at request parsing pass @pytest.mark.property @settings(max_examples=100) @given( username=username_strategy, email=email_strategy, password=password_strategy ) def test_property_new_user_default_role(test_db, username, email, password): """ Property 4: 新用户默认角色 Feature: jwt-authentication, Property 4: 对于任何新注册的用户, 其角色字段应该自动设置为 "annotator"。 """ try: user = AuthService.register_user(username, email, password) assert user.role == "annotator" except Exception: # Skip if registration fails pass @pytest.mark.property @settings(max_examples=100) @given( username=username_strategy, email=email_strategy, password=password_strategy ) def test_property_login_returns_valid_tokens(test_db, username, email, password): """ Property 5: 登录返回有效 Tokens Feature: jwt-authentication, Property 5: 对于任何已注册的用户, 使用正确的用户名和密码登录应该返回有效的 JWT tokens。 """ try: # Register user AuthService.register_user(username, email, password) # Login result = AuthService.login_user(username, password) # Should have tokens assert "access_token" in result assert "refresh_token" in result assert "user" in result # Tokens should be valid JWT format access_payload = pyjwt.decode( result["access_token"], options={"verify_signature": False} ) refresh_payload = pyjwt.decode( result["refresh_token"], options={"verify_signature": False} ) assert access_payload["type"] == "access" assert refresh_payload["type"] == "refresh" except Exception: # Skip if registration or login fails pass @pytest.mark.property @settings(max_examples=100) def test_property_access_token_contains_user_info(test_db, sample_user_data): """ Property 6: Access Token 包含完整用户信息 Feature: jwt-authentication, Property 6: 对于任何登录成功返回的 access_token, 解码后的 payload 应该包含完整的用户信息。 """ # Register and login AuthService.register_user(**sample_user_data) result = AuthService.login_user( sample_user_data["username"], sample_user_data["password"] ) # Decode token payload = pyjwt.decode( result["access_token"], options={"verify_signature": False} ) # Should contain all required fields assert "sub" in payload # user_id assert "username" in payload assert "email" in payload assert "role" in payload assert payload["username"] == sample_user_data["username"] assert payload["email"] == sample_user_data["email"] @pytest.mark.property @settings(max_examples=100) def test_property_token_expiration_times(test_db, sample_user_data): """ Property 7: Token 过期时间正确设置 Feature: jwt-authentication, Property 7: 对于任何生成的 token, 其过期时间应该符合配置的值。 """ from config import settings from datetime import datetime # Register and login AuthService.register_user(**sample_user_data) result = AuthService.login_user( sample_user_data["username"], sample_user_data["password"] ) # Check access token expiration access_payload = pyjwt.decode( result["access_token"], options={"verify_signature": False} ) access_exp = datetime.fromtimestamp(access_payload["exp"]) access_iat = datetime.fromtimestamp(access_payload["iat"]) access_diff = (access_exp - access_iat).total_seconds() / 60 # Should be approximately ACCESS_TOKEN_EXPIRE_MINUTES assert abs(access_diff - settings.ACCESS_TOKEN_EXPIRE_MINUTES) < 1 # Check refresh token expiration refresh_payload = pyjwt.decode( result["refresh_token"], options={"verify_signature": False} ) refresh_exp = datetime.fromtimestamp(refresh_payload["exp"]) refresh_iat = datetime.fromtimestamp(refresh_payload["iat"]) refresh_diff = (refresh_exp - refresh_iat).total_seconds() / (60 * 60 * 24) # Should be approximately REFRESH_TOKEN_EXPIRE_DAYS assert abs(refresh_diff - settings.REFRESH_TOKEN_EXPIRE_DAYS) < 0.1 @pytest.mark.property @settings(max_examples=100) def test_property_token_refresh_rotation(test_db, sample_user_data): """ Property 8: Token 刷新生成新令牌 Feature: jwt-authentication, Property 8: 对于任何有效的 refresh_token, 刷新应该返回新的 tokens,且新的 refresh_token 与旧的不同。 """ # Register and login AuthService.register_user(**sample_user_data) result1 = AuthService.login_user( sample_user_data["username"], sample_user_data["password"] ) old_refresh_token = result1["refresh_token"] # Refresh tokens result2 = AuthService.refresh_tokens(old_refresh_token) new_access_token = result2["access_token"] new_refresh_token = result2["refresh_token"] # Should have new tokens assert new_access_token != result1["access_token"] assert new_refresh_token != old_refresh_token # New tokens should be valid access_payload = pyjwt.decode( new_access_token, options={"verify_signature": False} ) assert access_payload["type"] == "access" ``` ### 集成测试示例 ```python # test/test_integration_auth.py import pytest from fastapi.testclient import TestClient @pytest.mark.integration def test_full_authentication_flow(client): """Test complete authentication flow from registration to protected endpoint access.""" # 1. Register user register_data = { "username": "integrationuser", "email": "integration@example.com", "password": "IntegrationPass123!" } response = client.post("/api/auth/register", json=register_data) assert response.status_code == 201 # 2. Login login_data = { "username": register_data["username"], "password": register_data["password"] } response = client.post("/api/auth/login", json=login_data) assert response.status_code == 200 tokens = response.json() access_token = tokens["access_token"] refresh_token = tokens["refresh_token"] # 3. Access protected endpoint headers = {"Authorization": f"Bearer {access_token}"} response = client.get("/api/auth/me", headers=headers) assert response.status_code == 200 user_info = response.json() assert user_info["username"] == register_data["username"] # 4. Access protected project endpoint response = client.get("/api/projects", headers=headers) assert response.status_code == 200 # 5. Refresh tokens response = client.post( "/api/auth/refresh", json={"refresh_token": refresh_token} ) assert response.status_code == 200 new_tokens = response.json() assert new_tokens["access_token"] != access_token # 6. Use new token new_headers = {"Authorization": f"Bearer {new_tokens['access_token']}"} response = client.get("/api/auth/me", headers=new_headers) assert response.status_code == 200 @pytest.mark.integration def test_middleware_protects_endpoints(client, auth_token): """ Property 11: 现有 API 端点受保护 Feature: jwt-authentication, Property 11: 对于任何受保护的端点, 不带 token 的请求应该返回 401,带有效 token 的请求应该成功。 """ protected_endpoints = [ "/api/projects", "/api/tasks", "/api/annotations" ] for endpoint in protected_endpoints: # Without token - should fail response = client.get(endpoint) assert response.status_code == 401 # With token - should succeed headers = {"Authorization": f"Bearer {auth_token}"} response = client.get(endpoint, headers=headers) assert response.status_code == 200 @pytest.mark.integration def test_public_endpoints_accessible(client): """Test that public endpoints don't require authentication.""" public_endpoints = [ "/", "/health", "/docs", "/openapi.json" ] for endpoint in public_endpoints: response = client.get(endpoint) assert response.status_code in [200, 404] # 404 is ok if endpoint doesn't exist ``` ### 测试覆盖率目标 - **代码覆盖率**: 最低 80% - **分支覆盖率**: 最低 70% - **关键路径覆盖**: 100%(认证、授权、token 生成) ### 运行测试 ```bash # 运行所有测试 pytest # 运行单元测试 pytest -m unit # 运行属性测试 pytest -m property # 运行集成测试 pytest -m integration # 生成覆盖率报告 pytest --cov=. --cov-report=html # 运行特定测试文件 pytest test/test_auth_service.py # 详细输出 pytest -v # 显示打印输出 pytest -s ``` ## 前端集成指南 ### React 前端实现概述 前端需要实现以下功能来与 JWT 认证系统集成: 1. 登录/注册表单 2. Token 存储和管理 3. HTTP 请求拦截器(自动附加 token) 4. Token 刷新逻辑 5. 认证状态管理(使用 Jotai) 6. 路由保护 ### 1. 认证 Atoms (Jotai 状态管理) ```typescript // web/apps/lq_label/src/atoms/auth-atoms.ts import { atom } from 'jotai'; import { atomWithStorage } from 'jotai/utils'; interface User { id: string; username: string; email: string; role: string; created_at: string; } interface AuthTokens { access_token: string; refresh_token: string; } // 使用 localStorage 持久化 tokens export const authTokensAtom = atomWithStorage( 'auth_tokens', null ); // 当前用户信息 export const currentUserAtom = atomWithStorage( 'current_user', null ); // 认证状态 export const isAuthenticatedAtom = atom( (get) => { const tokens = get(authTokensAtom); return tokens !== null && tokens.access_token !== ''; } ); // 登出 action export const logoutAtom = atom( null, (get, set) => { set(authTokensAtom, null); set(currentUserAtom, null); } ); ``` ### 2. 认证 API 服务 ```typescript // web/apps/lq_label/src/services/auth-service.ts import axios from 'axios'; const API_BASE_URL = process.env.REACT_APP_API_URL || 'http://localhost:8000'; export interface RegisterData { username: string; email: string; password: string; } export interface LoginData { username: string; password: string; } export interface AuthResponse { access_token: string; refresh_token: string; token_type: string; user: { id: string; username: string; email: string; role: string; created_at: string; }; } export const authService = { async register(data: RegisterData): Promise<{ message: string; user_id: string }> { const response = await axios.post(`${API_BASE_URL}/api/auth/register`, data); return response.data; }, async login(data: LoginData): Promise { const response = await axios.post(`${API_BASE_URL}/api/auth/login`, data); return response.data; }, async refreshToken(refreshToken: string): Promise { const response = await axios.post(`${API_BASE_URL}/api/auth/refresh`, { refresh_token: refreshToken, }); return response.data; }, async getCurrentUser(accessToken: string): Promise { const response = await axios.get(`${API_BASE_URL}/api/auth/me`, { headers: { Authorization: `Bearer ${accessToken}`, }, }); return response.data; }, }; ``` ### 3. Axios 拦截器配置 ```typescript // web/apps/lq_label/src/utils/axios-config.ts import axios, { AxiosError, InternalAxiosRequestConfig } from 'axios'; import { authTokensAtom, logoutAtom } from '../atoms/auth-atoms'; import { authService } from '../services/auth-service'; import { getDefaultStore } from 'jotai'; const API_BASE_URL = process.env.REACT_APP_API_URL || 'http://localhost:8000'; // 创建 axios 实例 export const apiClient = axios.create({ baseURL: API_BASE_URL, headers: { 'Content-Type': 'application/json', }, }); // Jotai store const store = getDefaultStore(); // 请求拦截器:自动附加 access token apiClient.interceptors.request.use( (config: InternalAxiosRequestConfig) => { const tokens = store.get(authTokensAtom); if (tokens?.access_token) { config.headers.Authorization = `Bearer ${tokens.access_token}`; } return config; }, (error) => { return Promise.reject(error); } ); // 响应拦截器:处理 token 过期 let isRefreshing = false; let failedQueue: Array<{ resolve: (value?: unknown) => void; reject: (reason?: unknown) => void; }> = []; const processQueue = (error: Error | null, token: string | null = null) => { failedQueue.forEach((prom) => { if (error) { prom.reject(error); } else { prom.resolve(token); } }); failedQueue = []; }; apiClient.interceptors.response.use( (response) => response, async (error: AxiosError) => { const originalRequest = error.config as InternalAxiosRequestConfig & { _retry?: boolean; }; // 如果是 401 错误且不是登录/注册请求 if ( error.response?.status === 401 && originalRequest && !originalRequest._retry && !originalRequest.url?.includes('/auth/login') && !originalRequest.url?.includes('/auth/register') ) { if (isRefreshing) { // 如果正在刷新,将请求加入队列 return new Promise((resolve, reject) => { failedQueue.push({ resolve, reject }); }) .then((token) => { originalRequest.headers.Authorization = `Bearer ${token}`; return apiClient(originalRequest); }) .catch((err) => { return Promise.reject(err); }); } originalRequest._retry = true; isRefreshing = true; const tokens = store.get(authTokensAtom); if (!tokens?.refresh_token) { // 没有 refresh token,直接登出 store.set(logoutAtom); return Promise.reject(error); } try { // 尝试刷新 token const response = await authService.refreshToken(tokens.refresh_token); // 更新 tokens store.set(authTokensAtom, { access_token: response.access_token, refresh_token: response.refresh_token, }); // 处理队列中的请求 processQueue(null, response.access_token); // 重试原始请求 originalRequest.headers.Authorization = `Bearer ${response.access_token}`; return apiClient(originalRequest); } catch (refreshError) { // 刷新失败,登出用户 processQueue(refreshError as Error, null); store.set(logoutAtom); return Promise.reject(refreshError); } finally { isRefreshing = false; } } return Promise.reject(error); } ); ``` ### 4. 登录组件 ```typescript // web/apps/lq_label/src/components/login-form/login-form.tsx import React, { useState } from 'react'; import { useAtom } from 'jotai'; import { useNavigate } from 'react-router-dom'; import { authTokensAtom, currentUserAtom } from '../../atoms/auth-atoms'; import { authService } from '../../services/auth-service'; export const LoginForm: React.FC = () => { const [, setTokens] = useAtom(authTokensAtom); const [, setCurrentUser] = useAtom(currentUserAtom); const navigate = useNavigate(); const [formData, setFormData] = useState({ username: '', password: '', }); const [error, setError] = useState(null); const [loading, setLoading] = useState(false); const handleSubmit = async (e: React.FormEvent) => { e.preventDefault(); setError(null); setLoading(true); try { const response = await authService.login(formData); // 保存 tokens 和用户信息 setTokens({ access_token: response.access_token, refresh_token: response.refresh_token, }); setCurrentUser(response.user); // 跳转到主页 navigate('/'); } catch (err: any) { setError(err.response?.data?.detail || '登录失败,请重试'); } finally { setLoading(false); } }; return (
setFormData({ ...formData, username: e.target.value })} required className="w-full p-comfortable border rounded" />
setFormData({ ...formData, password: e.target.value })} required className="w-full p-comfortable border rounded" />
{error && (
{error}
)}
); }; ``` ### 5. 路由保护 ```typescript // web/apps/lq_label/src/components/protected-route/protected-route.tsx import React from 'react'; import { Navigate } from 'react-router-dom'; import { useAtomValue } from 'jotai'; import { isAuthenticatedAtom } from '../../atoms/auth-atoms'; interface ProtectedRouteProps { children: React.ReactNode; } export const ProtectedRoute: React.FC = ({ children }) => { const isAuthenticated = useAtomValue(isAuthenticatedAtom); if (!isAuthenticated) { return ; } return <>{children}; }; ``` ### 6. 应用路由配置 ```typescript // web/apps/lq_label/src/app/app.tsx import { BrowserRouter, Routes, Route } from 'react-router-dom'; import { LoginForm } from '../components/login-form/login-form'; import { RegisterForm } from '../components/register-form/register-form'; import { ProtectedRoute } from '../components/protected-route/protected-route'; import { ProjectList } from '../views/project-list/project-list'; export function App() { return ( } /> } /> } /> {/* 其他受保护的路由 */} ); } ``` ### 7. 环境变量配置 ```bash # web/apps/lq_label/.env REACT_APP_API_URL=http://localhost:8000 ``` ### 前端集成检查清单 - [ ] 安装依赖:axios, jotai, jotai/utils, react-router-dom - [ ] 创建认证 atoms(authTokensAtom, currentUserAtom) - [ ] 实现认证服务(authService) - [ ] 配置 axios 拦截器(自动附加 token,处理刷新) - [ ] 创建登录/注册表单组件 - [ ] 实现路由保护组件(ProtectedRoute) - [ ] 配置应用路由 - [ ] 设置环境变量 - [ ] 测试完整认证流程