本文档描述了标注平台 JWT 权限认证体系的详细设计。该系统采用 FastAPI + JWT 的标准认证方案,使用中间件实现全局鉴权,并为未来的 OAuth 集成预留扩展接口。
┌─────────────────────────────────────────────────────────────┐
│ Frontend (React) │
│ - Login Form │
│ - Token Storage (localStorage) │
│ - HTTP Interceptor (Auto attach token) │
└────────────────────────┬────────────────────────────────────┘
│ HTTP Request (Authorization: Bearer <token>)
▼
┌─────────────────────────────────────────────────────────────┐
│ FastAPI Application │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Auth Middleware │ │
│ │ - Extract JWT from header │ │
│ │ - Verify signature & expiration │ │
│ │ - Attach user info to request │ │
│ │ - Skip public endpoints │ │
│ └───────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌───────────────────▼───────────────────────────────────┐ │
│ │ Router Layer │ │
│ │ - /api/auth/* (Auth Router) │ │
│ │ - /api/projects/* (Protected) │ │
│ │ - /api/tasks/* (Protected) │ │
│ │ - /api/annotations/* (Protected) │ │
│ └───────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌───────────────────▼───────────────────────────────────┐ │
│ │ Service Layer │ │
│ │ - AuthService (login, register, token) │ │
│ │ - UserService (user CRUD) │ │
│ │ - JWTService (token generation & validation) │ │
│ └───────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌───────────────────▼───────────────────────────────────┐ │
│ │ Data Layer │ │
│ │ - User Model │ │
│ │ - Database Connection │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌──────────────┐
│ SQLite DB │
│ - users │
└──────────────┘
User → POST /api/auth/register
→ AuthService.register()
→ Validate input (username, email, password)
→ Check username/email uniqueness
→ Hash password with bcrypt
→ Create user in database
→ Return success message
User → POST /api/auth/login
→ AuthService.login()
→ Validate credentials
→ Verify password with bcrypt
→ Generate Access Token (15 min)
→ Generate Refresh Token (7 days)
→ Return tokens + user info
User → POST /api/auth/refresh
→ AuthService.refresh_token()
→ Verify Refresh Token
→ Extract user info
→ Generate new Access Token
→ Generate new Refresh Token (rotation)
→ Return new tokens
User → GET /api/projects (with Authorization header)
→ Auth Middleware
→ Extract token from header
→ Verify JWT signature
→ Check expiration
→ Extract user info
→ Attach to request.state.user
→ Continue to route handler
→ Route handler uses request.state.user
class User:
"""User model representing a system user."""
def __init__(
self,
id: str,
username: str,
email: str,
password_hash: str,
role: str,
oauth_provider: Optional[str],
oauth_id: Optional[str],
created_at: datetime
):
self.id = id
self.username = username
self.email = email
self.password_hash = password_hash
self.role = role
self.oauth_provider = oauth_provider # 预留 OAuth
self.oauth_id = oauth_id # 预留 OAuth
self.created_at = created_at
@classmethod
def from_row(cls, row):
"""Create User instance from database row."""
return cls(
id=row["id"],
username=row["username"],
email=row["email"],
password_hash=row["password_hash"],
role=row["role"],
oauth_provider=row.get("oauth_provider"),
oauth_id=row.get("oauth_id"),
created_at=row["created_at"]
)
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
class UserRegister(BaseModel):
"""User registration request schema."""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=8, max_length=100)
class UserLogin(BaseModel):
"""User login request schema."""
username: str
password: str
class TokenResponse(BaseModel):
"""Token response schema."""
access_token: str
refresh_token: str
token_type: str = "bearer"
user: "UserResponse"
class TokenRefresh(BaseModel):
"""Token refresh request schema."""
refresh_token: str
class UserResponse(BaseModel):
"""User response schema."""
id: str
username: str
email: str
role: str
created_at: datetime
class TokenPayload(BaseModel):
"""JWT token payload schema."""
sub: str # user_id
username: str
email: str
role: str
exp: datetime
iat: datetime
from datetime import datetime, timedelta
from typing import Dict, Optional
import jwt
from config import settings
class JWTService:
"""Service for JWT token operations."""
@staticmethod
def create_access_token(user_data: Dict) -> str:
"""
Create access token with 15 minutes expiration.
Args:
user_data: Dict containing user_id, username, email, role
Returns:
Encoded JWT token string
"""
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
payload = {
"sub": user_data["id"],
"username": user_data["username"],
"email": user_data["email"],
"role": user_data["role"],
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
}
return jwt.encode(
payload,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM
)
@staticmethod
def create_refresh_token(user_data: Dict) -> str:
"""
Create refresh token with 7 days expiration.
Args:
user_data: Dict containing user_id
Returns:
Encoded JWT token string
"""
expire = datetime.utcnow() + timedelta(
days=settings.REFRESH_TOKEN_EXPIRE_DAYS
)
payload = {
"sub": user_data["id"],
"exp": expire,
"iat": datetime.utcnow(),
"type": "refresh"
}
return jwt.encode(
payload,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM
)
@staticmethod
def verify_token(token: str, token_type: str = "access") -> Optional[Dict]:
"""
Verify and decode JWT token.
Args:
token: JWT token string
token_type: Expected token type (access or refresh)
Returns:
Decoded payload dict or None if invalid
Raises:
jwt.ExpiredSignatureError: Token has expired
jwt.InvalidTokenError: Token is invalid
"""
try:
payload = jwt.decode(
token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
# Verify token type
if payload.get("type") != token_type:
return None
return payload
except jwt.ExpiredSignatureError:
raise
except jwt.InvalidTokenError:
raise
import bcrypt
import uuid
from typing import Optional, Dict
from database import get_db_connection
from models import User
from services.jwt_service import JWTService
from fastapi import HTTPException, status
class AuthService:
"""Service for authentication operations."""
@staticmethod
def register_user(username: str, email: str, password: str) -> User:
"""
Register a new user.
Args:
username: Unique username
email: Unique email address
password: Plain text password
Returns:
Created User object
Raises:
HTTPException: 409 if username or email already exists
"""
with get_db_connection() as conn:
cursor = conn.cursor()
# Check username uniqueness
cursor.execute(
"SELECT id FROM users WHERE username = ?",
(username,)
)
if cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="用户名已被使用"
)
# Check email uniqueness
cursor.execute(
"SELECT id FROM users WHERE email = ?",
(email,)
)
if cursor.fetchone():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="邮箱已被使用"
)
# Hash password
password_hash = bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
# Create user
user_id = f"user_{uuid.uuid4().hex[:12]}"
cursor.execute("""
INSERT INTO users (
id, username, email, password_hash, role
)
VALUES (?, ?, ?, ?, ?)
""", (user_id, username, email, password_hash, "annotator"))
# Fetch created user
cursor.execute(
"SELECT * FROM users WHERE id = ?",
(user_id,)
)
row = cursor.fetchone()
return User.from_row(row)
@staticmethod
def login_user(username: str, password: str) -> Dict:
"""
Authenticate user and generate tokens.
Args:
username: Username
password: Plain text password
Returns:
Dict containing access_token, refresh_token, and user info
Raises:
HTTPException: 401 if credentials are invalid
"""
with get_db_connection() as conn:
cursor = conn.cursor()
# Find user
cursor.execute(
"SELECT * FROM users WHERE username = ?",
(username,)
)
row = cursor.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误"
)
user = User.from_row(row)
# Verify password
if not bcrypt.checkpw(
password.encode('utf-8'),
user.password_hash.encode('utf-8')
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误"
)
# Generate tokens
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"role": user.role
}
access_token = JWTService.create_access_token(user_data)
refresh_token = JWTService.create_refresh_token(user_data)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"user": user_data
}
@staticmethod
def refresh_tokens(refresh_token: str) -> Dict:
"""
Refresh access token using refresh token.
Args:
refresh_token: Valid refresh token
Returns:
Dict containing new access_token and refresh_token
Raises:
HTTPException: 401 if refresh token is invalid or expired
"""
try:
payload = JWTService.verify_token(refresh_token, "refresh")
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的刷新令牌"
)
user_id = payload["sub"]
# Fetch user from database
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM users WHERE id = ?",
(user_id,)
)
row = cursor.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在"
)
user = User.from_row(row)
user_data = {
"id": user.id,
"username": user.username,
"email": user.email,
"role": user.role
}
# Generate new tokens (token rotation)
new_access_token = JWTService.create_access_token(user_data)
new_refresh_token = JWTService.create_refresh_token(user_data)
return {
"access_token": new_access_token,
"refresh_token": new_refresh_token,
"user": user_data
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="刷新令牌已过期或无效,请重新登录"
)
@staticmethod
def get_current_user(user_id: str) -> User:
"""
Get user by ID.
Args:
user_id: User unique identifier
Returns:
User object
Raises:
HTTPException: 404 if user not found
"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM users WHERE id = ?",
(user_id,)
)
row = cursor.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
return User.from_row(row)
from fastapi import Request, HTTPException, status
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from services.jwt_service import JWTService
import jwt
class AuthMiddleware(BaseHTTPMiddleware):
"""
Authentication middleware for JWT token verification.
Validates JWT tokens and attaches user info to request state.
"""
# Public endpoints that don't require authentication
PUBLIC_PATHS = {
"/",
"/health",
"/docs",
"/openapi.json",
"/redoc",
"/api/auth/register",
"/api/auth/login",
"/api/auth/refresh",
}
async def dispatch(self, request: Request, call_next):
"""
Process each request through authentication.
Args:
request: FastAPI Request object
call_next: Next middleware or route handler
Returns:
Response from next handler or error response
"""
# Skip authentication for public paths
if request.url.path in self.PUBLIC_PATHS:
return await call_next(request)
# Skip authentication for OPTIONS requests (CORS preflight)
if request.method == "OPTIONS":
return await call_next(request)
# Extract token from Authorization header
auth_header = request.headers.get("Authorization")
if not auth_header:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "缺少认证令牌",
"error_type": "missing_token"
}
)
# Verify Bearer token format
parts = auth_header.split()
if len(parts) != 2 or parts[0].lower() != "bearer":
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "无效的认证令牌格式",
"error_type": "invalid_token_format"
}
)
token = parts[1]
try:
# Verify and decode token
payload = JWTService.verify_token(token, "access")
if not payload:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "无效的认证令牌",
"error_type": "invalid_token"
}
)
# Attach user info to request state
request.state.user = {
"id": payload["sub"],
"username": payload["username"],
"email": payload["email"],
"role": payload["role"]
}
# Continue to route handler
response = await call_next(request)
return response
except jwt.ExpiredSignatureError:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "认证令牌已过期",
"error_type": "token_expired"
}
)
except jwt.InvalidTokenError:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "无效的认证令牌",
"error_type": "invalid_token"
}
)
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"detail": "认证过程发生错误",
"error_type": "auth_error"
}
)
def require_role(*allowed_roles: str):
"""
Decorator to check user role.
Usage:
@require_role("admin", "annotator")
async def my_endpoint(request: Request):
...
Args:
allowed_roles: Tuple of allowed role names
Returns:
Decorator function
"""
def decorator(func):
async def wrapper(request: Request, *args, **kwargs):
user = getattr(request.state, "user", None)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未认证"
)
if user["role"] not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足"
)
return await func(request, *args, **kwargs)
return wrapper
return decorator
from fastapi import APIRouter, HTTPException, status, Request
from schemas.auth import (
UserRegister, UserLogin, TokenResponse,
TokenRefresh, UserResponse
)
from services.auth_service import AuthService
router = APIRouter(
prefix="/api/auth",
tags=["authentication"]
)
@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(user_data: UserRegister):
"""
Register a new user.
Args:
user_data: User registration data
Returns:
Success message
Raises:
HTTPException: 409 if username or email already exists
HTTPException: 400 if validation fails
"""
user = AuthService.register_user(
username=user_data.username,
email=user_data.email,
password=user_data.password
)
return {
"message": "用户注册成功",
"user_id": user.id
}
@router.post("/login", response_model=TokenResponse)
async def login(credentials: UserLogin):
"""
Authenticate user and return JWT tokens.
Args:
credentials: User login credentials
Returns:
Access token, refresh token, and user info
Raises:
HTTPException: 401 if credentials are invalid
"""
result = AuthService.login_user(
username=credentials.username,
password=credentials.password
)
return TokenResponse(
access_token=result["access_token"],
refresh_token=result["refresh_token"],
user=UserResponse(**result["user"])
)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(token_data: TokenRefresh):
"""
Refresh access token using refresh token.
Args:
token_data: Refresh token
Returns:
New access token and refresh token
Raises:
HTTPException: 401 if refresh token is invalid or expired
"""
result = AuthService.refresh_tokens(token_data.refresh_token)
return TokenResponse(
access_token=result["access_token"],
refresh_token=result["refresh_token"],
user=UserResponse(**result["user"])
)
@router.get("/me", response_model=UserResponse)
async def get_current_user(request: Request):
"""
Get current authenticated user info.
Args:
request: FastAPI Request with user info in state
Returns:
Current user information
Raises:
HTTPException: 401 if not authenticated
"""
user_data = getattr(request.state, "user", None)
if not user_data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未认证"
)
user = AuthService.get_current_user(user_data["id"])
return UserResponse(
id=user.id,
username=user.username,
email=user.email,
role=user.role,
created_at=user.created_at
)
import os
import secrets
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Application settings."""
# JWT Settings
JWT_SECRET_KEY: str = os.getenv(
"JWT_SECRET_KEY",
secrets.token_urlsafe(32)
)
JWT_ALGORITHM: str = os.getenv("JWT_ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES: int = int(
os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "15")
)
REFRESH_TOKEN_EXPIRE_DAYS: int = int(
os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", "7")
)
# Database Settings
DATABASE_PATH: str = os.getenv(
"DATABASE_PATH",
"annotation_platform.db"
)
# OAuth Settings (预留)
OAUTH_ENABLED: bool = os.getenv("OAUTH_ENABLED", "false").lower() == "true"
OAUTH_PROVIDER_URL: str = os.getenv("OAUTH_PROVIDER_URL", "")
OAUTH_CLIENT_ID: str = os.getenv("OAUTH_CLIENT_ID", "")
OAUTH_CLIENT_SECRET: str = os.getenv("OAUTH_CLIENT_SECRET", "")
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
# Warn if using default JWT secret
if settings.JWT_SECRET_KEY == secrets.token_urlsafe(32):
import logging
logging.warning(
"使用默认生成的 JWT_SECRET_KEY,生产环境请设置环境变量"
)
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'annotator',
oauth_provider TEXT, -- 预留: 'google', 'github', etc.
oauth_id TEXT, -- 预留: OAuth provider user ID
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
CREATE INDEX IF NOT EXISTS idx_users_oauth ON users(oauth_provider, oauth_id);
| 字段 | 类型 | 说明 | 约束 |
|---|---|---|---|
| id | TEXT | 用户唯一标识 | PRIMARY KEY, 格式: user_xxxxx |
| username | TEXT | 用户名 | NOT NULL, UNIQUE, 3-50字符 |
| TEXT | 邮箱地址 | NOT NULL, UNIQUE, 有效邮箱格式 | |
| password_hash | TEXT | bcrypt 加密的密码 | NOT NULL |
| role | TEXT | 用户角色 | NOT NULL, 枚举: admin/annotator/viewer |
| oauth_provider | TEXT | OAuth 提供商 | NULLABLE, 预留字段 |
| oauth_id | TEXT | OAuth 用户 ID | NULLABLE, 预留字段 |
| created_at | TIMESTAMP | 创建时间 | DEFAULT CURRENT_TIMESTAMP |
| updated_at | TIMESTAMP | 更新时间 | DEFAULT CURRENT_TIMESTAMP |
{
"sub": "user_abc123def456",
"username": "john_doe",
"email": "john@example.com",
"role": "annotator",
"type": "access",
"iat": 1704067200,
"exp": 1704068100
}
{
"sub": "user_abc123def456",
"type": "refresh",
"iat": 1704067200,
"exp": 1704672000
}
POST /api/auth/register
Content-Type: application/json
{
"username": "john_doe",
"email": "john@example.com",
"password": "SecurePass123!"
}
HTTP/1.1 201 Created
Content-Type: application/json
{
"message": "用户注册成功",
"user_id": "user_abc123def456"
}
POST /api/auth/login
Content-Type: application/json
{
"username": "john_doe",
"password": "SecurePass123!"
}
HTTP/1.1 200 OK
Content-Type: application/json
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"user": {
"id": "user_abc123def456",
"username": "john_doe",
"email": "john@example.com",
"role": "annotator",
"created_at": "2024-01-01T00:00:00Z"
}
}
GET /api/projects
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
属性(Property)是系统在所有有效执行中应该保持为真的特征或行为——本质上是关于系统应该做什么的形式化陈述。属性作为人类可读规范和机器可验证正确性保证之间的桥梁。
*对于任何*有效的用户名、邮箱和密码组合,注册后应该在数据库中创建一个新用户记录,且该记录包含正确的用户名、邮箱和加密后的密码。
Validates: Requirements 1.1, 1.5
*对于任何*注册的用户,数据库中存储的 password_hash 字段应该不等于原始密码,且应该是有效的 bcrypt 哈希格式(以 $2b$ 开头)。
Validates: Requirements 1.5, 7.1
*对于任何*长度小于 8 个字符的密码字符串,注册请求应该返回 400 错误,且不应该在数据库中创建用户记录。
Validates: Requirements 1.4
*对于任何*新注册的用户,其角色字段应该自动设置为 "annotator"。
Validates: Requirements 1.6
*对于任何*已注册的用户,使用正确的用户名和密码登录应该返回包含 access_token、refresh_token 和 user 信息的响应,且两个 token 都应该是有效的 JWT 格式。
Validates: Requirements 2.1, 2.6
*对于任何*登录成功返回的 access_token,解码后的 payload 应该包含 sub(用户ID)、username、email 和 role 字段。
Validates: Requirements 2.3
*对于任何*生成的 access_token,其过期时间(exp)与签发时间(iat)的差值应该等于配置的 ACCESS_TOKEN_EXPIRE_MINUTES;对于任何 refresh_token,差值应该等于配置的 REFRESH_TOKEN_EXPIRE_DAYS。
Validates: Requirements 2.4, 2.5
*对于任何*有效的 refresh_token,使用它刷新应该返回新的 access_token 和 refresh_token,且新的 refresh_token 应该与旧的不同(token rotation)。
Validates: Requirements 3.1, 3.4
*对于任何*受保护的端点和有效的 JWT access_token,请求应该成功通过中间件验证,且 request.state.user 应该包含正确的用户信息(id、username、email、role)。
Validates: Requirements 4.1, 4.5
*对于任何*已认证的用户,请求 /api/auth/me 端点应该返回该用户的完整信息(id、username、email、role、created_at),且信息应该与数据库中的记录一致。
Validates: Requirements 6.1
对于任何 /api/projects、/api/tasks 或 /api/annotations 端点,不带有效 token 的请求应该返回 401 错误,带有效 token 的请求应该成功处理。
Validates: Requirements 11.1, 11.2, 11.3
*对于任何*需要特定角色的端点,只有具有允许角色的用户才能成功访问,其他角色的用户应该收到 403 错误。
Validates: Requirements 12.2, 12.3, 12.4
所有错误响应遵循统一的 JSON 格式:
{
"detail": "错误描述信息",
"error_type": "error_category"
}
| Error Type | Detail | 场景 |
|---|---|---|
| missing_token | 缺少认证令牌 | Authorization header 不存在 |
| invalid_token_format | 无效的认证令牌格式 | 不是 "Bearer " 格式 |
| invalid_token | 无效的认证令牌 | JWT 签名验证失败 |
| token_expired | 认证令牌已过期 | JWT 过期时间已过 |
| invalid_credentials | 用户名或密码错误 | 登录凭据不正确 |
| user_not_found | 用户不存在 | 刷新 token 时用户已被删除 |
| Error Type | Detail | 场景 |
|---|---|---|
| insufficient_permissions | 权限不足 | 用户角色不满足端点要求 |
| Error Type | Detail | 场景 |
|---|---|---|
| validation_error | 输入验证失败 | Pydantic 验证失败 |
| password_too_short | 密码长度不足 | 密码少于 8 个字符 |
| invalid_email | 无效的邮箱格式 | 邮箱格式不正确 |
| Error Type | Detail | 场景 |
|---|---|---|
| username_exists | 用户名已被使用 | 注册时用户名重复 |
| email_exists | 邮箱已被使用 | 注册时邮箱重复 |
| Error Type | Detail | 场景 |
|---|---|---|
| auth_error | 认证过程发生错误 | 中间件内部错误 |
| database_error | 数据库操作失败 | 数据库连接或查询错误 |
| internal_error | 服务器内部错误 | 未预期的异常 |
import logging
logger = logging.getLogger(__name__)
# 记录认证失败(不记录密码)
logger.warning(
f"Login failed for username: {username}",
extra={"username": username, "ip": request.client.host}
)
# 记录服务器错误
logger.error(
f"Authentication error: {str(e)}",
exc_info=True,
extra={"user_id": user_id}
)
FastAPI Exception Handlers
↓
Custom Exception Classes
↓
Service Layer Exceptions
↓
Database Exceptions
class AuthenticationError(Exception):
"""Base authentication exception."""
pass
class InvalidCredentialsError(AuthenticationError):
"""Invalid username or password."""
pass
class TokenExpiredError(AuthenticationError):
"""JWT token has expired."""
pass
class InsufficientPermissionsError(AuthenticationError):
"""User lacks required permissions."""
pass
本项目采用双重测试方法,结合单元测试和属性测试,确保全面的代码覆盖和正确性验证。
# requirements.txt
pytest==7.4.3
pytest-asyncio==0.21.1
hypothesis==6.92.1
faker==20.1.0 # 生成测试数据
backend/test/
├── __init__.py
├── conftest.py # pytest fixtures
├── test_auth_service.py # AuthService 单元测试
├── test_jwt_service.py # JWTService 单元测试
├── test_auth_middleware.py # 中间件单元测试
├── test_auth_router.py # 路由集成测试
├── test_properties_auth.py # 属性测试
└── test_integration_auth.py # 端到端集成测试
[pytest]
testpaths = test
python_files = test_*.py
python_classes = Test*
python_functions = test_*
asyncio_mode = auto
markers =
unit: Unit tests
integration: Integration tests
property: Property-based tests
slow: Slow running tests
import pytest
import os
from database import init_database, get_db_connection
from fastapi.testclient import TestClient
from main import app
@pytest.fixture(scope="function")
def test_db():
"""Create a test database for each test."""
# Use in-memory database for tests
os.environ["DATABASE_PATH"] = ":memory:"
init_database()
yield
# Cleanup handled by in-memory database
@pytest.fixture
def client(test_db):
"""Create a test client."""
return TestClient(app)
@pytest.fixture
def sample_user_data():
"""Sample user data for testing."""
return {
"username": "testuser",
"email": "test@example.com",
"password": "TestPass123!"
}
@pytest.fixture
def registered_user(client, sample_user_data):
"""Register a user and return the data."""
response = client.post("/api/auth/register", json=sample_user_data)
assert response.status_code == 201
return sample_user_data
@pytest.fixture
def auth_token(client, registered_user):
"""Get authentication token for a registered user."""
response = client.post("/api/auth/login", json={
"username": registered_user["username"],
"password": registered_user["password"]
})
assert response.status_code == 200
return response.json()["access_token"]
# test/test_auth_service.py
import pytest
from services.auth_service import AuthService
from fastapi import HTTPException
@pytest.mark.unit
def test_register_user_success(test_db, sample_user_data):
"""Test successful user registration."""
user = AuthService.register_user(
username=sample_user_data["username"],
email=sample_user_data["email"],
password=sample_user_data["password"]
)
assert user.username == sample_user_data["username"]
assert user.email == sample_user_data["email"]
assert user.role == "annotator"
assert user.password_hash != sample_user_data["password"]
assert user.password_hash.startswith("$2b$")
@pytest.mark.unit
def test_register_duplicate_username(test_db, sample_user_data):
"""Test registration with duplicate username."""
# Register first user
AuthService.register_user(**sample_user_data)
# Try to register with same username
with pytest.raises(HTTPException) as exc_info:
AuthService.register_user(
username=sample_user_data["username"],
email="different@example.com",
password=sample_user_data["password"]
)
assert exc_info.value.status_code == 409
assert "用户名已被使用" in exc_info.value.detail
@pytest.mark.unit
def test_register_short_password(test_db):
"""Test registration with password less than 8 characters."""
# This should be caught by Pydantic validation
# Test at router level
pass
# test/test_jwt_service.py
import pytest
import jwt
from datetime import datetime, timedelta
from services.jwt_service import JWTService
from config import settings
@pytest.mark.unit
def test_create_access_token():
"""Test access token creation."""
user_data = {
"id": "user_123",
"username": "testuser",
"email": "test@example.com",
"role": "annotator"
}
token = JWTService.create_access_token(user_data)
# Decode and verify
payload = jwt.decode(
token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
assert payload["sub"] == user_data["id"]
assert payload["username"] == user_data["username"]
assert payload["email"] == user_data["email"]
assert payload["role"] == user_data["role"]
assert payload["type"] == "access"
@pytest.mark.unit
def test_verify_expired_token():
"""Test verification of expired token."""
user_data = {"id": "user_123"}
# Create token with past expiration
expire = datetime.utcnow() - timedelta(minutes=1)
payload = {
"sub": user_data["id"],
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
}
token = jwt.encode(
payload,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM
)
# Should raise ExpiredSignatureError
with pytest.raises(jwt.ExpiredSignatureError):
JWTService.verify_token(token, "access")
# test/test_properties_auth.py
import pytest
from hypothesis import given, strategies as st, settings
from services.auth_service import AuthService
from services.jwt_service import JWTService
import bcrypt
import jwt as pyjwt
# Custom strategies for generating test data
username_strategy = st.text(
alphabet=st.characters(whitelist_categories=('Lu', 'Ll', 'Nd')),
min_size=3,
max_size=50
)
email_strategy = st.emails()
password_strategy = st.text(min_size=8, max_size=100)
@pytest.mark.property
@settings(max_examples=100)
@given(
username=username_strategy,
email=email_strategy,
password=password_strategy
)
def test_property_password_hash_not_reversible(test_db, username, email, password):
"""
Property 2: 密码哈希不可逆
Feature: jwt-authentication, Property 2: 对于任何注册的用户,
数据库中存储的 password_hash 字段应该不等于原始密码,
且应该是有效的 bcrypt 哈希格式。
"""
try:
user = AuthService.register_user(username, email, password)
# Password hash should not equal original password
assert user.password_hash != password
# Should be valid bcrypt format
assert user.password_hash.startswith("$2b$")
# Should be able to verify with bcrypt
assert bcrypt.checkpw(
password.encode('utf-8'),
user.password_hash.encode('utf-8')
)
except Exception:
# Skip if registration fails due to constraints
pass
@pytest.mark.property
@settings(max_examples=100)
@given(password=st.text(min_size=0, max_size=7))
def test_property_short_passwords_rejected(test_db, password):
"""
Property 3: 短密码被拒绝
Feature: jwt-authentication, Property 3: 对于任何长度小于 8 个字符的密码字符串,
注册请求应该返回 400 错误。
"""
# This should be tested at router level with TestClient
# since Pydantic validation happens at request parsing
pass
@pytest.mark.property
@settings(max_examples=100)
@given(
username=username_strategy,
email=email_strategy,
password=password_strategy
)
def test_property_new_user_default_role(test_db, username, email, password):
"""
Property 4: 新用户默认角色
Feature: jwt-authentication, Property 4: 对于任何新注册的用户,
其角色字段应该自动设置为 "annotator"。
"""
try:
user = AuthService.register_user(username, email, password)
assert user.role == "annotator"
except Exception:
# Skip if registration fails
pass
@pytest.mark.property
@settings(max_examples=100)
@given(
username=username_strategy,
email=email_strategy,
password=password_strategy
)
def test_property_login_returns_valid_tokens(test_db, username, email, password):
"""
Property 5: 登录返回有效 Tokens
Feature: jwt-authentication, Property 5: 对于任何已注册的用户,
使用正确的用户名和密码登录应该返回有效的 JWT tokens。
"""
try:
# Register user
AuthService.register_user(username, email, password)
# Login
result = AuthService.login_user(username, password)
# Should have tokens
assert "access_token" in result
assert "refresh_token" in result
assert "user" in result
# Tokens should be valid JWT format
access_payload = pyjwt.decode(
result["access_token"],
options={"verify_signature": False}
)
refresh_payload = pyjwt.decode(
result["refresh_token"],
options={"verify_signature": False}
)
assert access_payload["type"] == "access"
assert refresh_payload["type"] == "refresh"
except Exception:
# Skip if registration or login fails
pass
@pytest.mark.property
@settings(max_examples=100)
def test_property_access_token_contains_user_info(test_db, sample_user_data):
"""
Property 6: Access Token 包含完整用户信息
Feature: jwt-authentication, Property 6: 对于任何登录成功返回的 access_token,
解码后的 payload 应该包含完整的用户信息。
"""
# Register and login
AuthService.register_user(**sample_user_data)
result = AuthService.login_user(
sample_user_data["username"],
sample_user_data["password"]
)
# Decode token
payload = pyjwt.decode(
result["access_token"],
options={"verify_signature": False}
)
# Should contain all required fields
assert "sub" in payload # user_id
assert "username" in payload
assert "email" in payload
assert "role" in payload
assert payload["username"] == sample_user_data["username"]
assert payload["email"] == sample_user_data["email"]
@pytest.mark.property
@settings(max_examples=100)
def test_property_token_expiration_times(test_db, sample_user_data):
"""
Property 7: Token 过期时间正确设置
Feature: jwt-authentication, Property 7: 对于任何生成的 token,
其过期时间应该符合配置的值。
"""
from config import settings
from datetime import datetime
# Register and login
AuthService.register_user(**sample_user_data)
result = AuthService.login_user(
sample_user_data["username"],
sample_user_data["password"]
)
# Check access token expiration
access_payload = pyjwt.decode(
result["access_token"],
options={"verify_signature": False}
)
access_exp = datetime.fromtimestamp(access_payload["exp"])
access_iat = datetime.fromtimestamp(access_payload["iat"])
access_diff = (access_exp - access_iat).total_seconds() / 60
# Should be approximately ACCESS_TOKEN_EXPIRE_MINUTES
assert abs(access_diff - settings.ACCESS_TOKEN_EXPIRE_MINUTES) < 1
# Check refresh token expiration
refresh_payload = pyjwt.decode(
result["refresh_token"],
options={"verify_signature": False}
)
refresh_exp = datetime.fromtimestamp(refresh_payload["exp"])
refresh_iat = datetime.fromtimestamp(refresh_payload["iat"])
refresh_diff = (refresh_exp - refresh_iat).total_seconds() / (60 * 60 * 24)
# Should be approximately REFRESH_TOKEN_EXPIRE_DAYS
assert abs(refresh_diff - settings.REFRESH_TOKEN_EXPIRE_DAYS) < 0.1
@pytest.mark.property
@settings(max_examples=100)
def test_property_token_refresh_rotation(test_db, sample_user_data):
"""
Property 8: Token 刷新生成新令牌
Feature: jwt-authentication, Property 8: 对于任何有效的 refresh_token,
刷新应该返回新的 tokens,且新的 refresh_token 与旧的不同。
"""
# Register and login
AuthService.register_user(**sample_user_data)
result1 = AuthService.login_user(
sample_user_data["username"],
sample_user_data["password"]
)
old_refresh_token = result1["refresh_token"]
# Refresh tokens
result2 = AuthService.refresh_tokens(old_refresh_token)
new_access_token = result2["access_token"]
new_refresh_token = result2["refresh_token"]
# Should have new tokens
assert new_access_token != result1["access_token"]
assert new_refresh_token != old_refresh_token
# New tokens should be valid
access_payload = pyjwt.decode(
new_access_token,
options={"verify_signature": False}
)
assert access_payload["type"] == "access"
# test/test_integration_auth.py
import pytest
from fastapi.testclient import TestClient
@pytest.mark.integration
def test_full_authentication_flow(client):
"""Test complete authentication flow from registration to protected endpoint access."""
# 1. Register user
register_data = {
"username": "integrationuser",
"email": "integration@example.com",
"password": "IntegrationPass123!"
}
response = client.post("/api/auth/register", json=register_data)
assert response.status_code == 201
# 2. Login
login_data = {
"username": register_data["username"],
"password": register_data["password"]
}
response = client.post("/api/auth/login", json=login_data)
assert response.status_code == 200
tokens = response.json()
access_token = tokens["access_token"]
refresh_token = tokens["refresh_token"]
# 3. Access protected endpoint
headers = {"Authorization": f"Bearer {access_token}"}
response = client.get("/api/auth/me", headers=headers)
assert response.status_code == 200
user_info = response.json()
assert user_info["username"] == register_data["username"]
# 4. Access protected project endpoint
response = client.get("/api/projects", headers=headers)
assert response.status_code == 200
# 5. Refresh tokens
response = client.post(
"/api/auth/refresh",
json={"refresh_token": refresh_token}
)
assert response.status_code == 200
new_tokens = response.json()
assert new_tokens["access_token"] != access_token
# 6. Use new token
new_headers = {"Authorization": f"Bearer {new_tokens['access_token']}"}
response = client.get("/api/auth/me", headers=new_headers)
assert response.status_code == 200
@pytest.mark.integration
def test_middleware_protects_endpoints(client, auth_token):
"""
Property 11: 现有 API 端点受保护
Feature: jwt-authentication, Property 11: 对于任何受保护的端点,
不带 token 的请求应该返回 401,带有效 token 的请求应该成功。
"""
protected_endpoints = [
"/api/projects",
"/api/tasks",
"/api/annotations"
]
for endpoint in protected_endpoints:
# Without token - should fail
response = client.get(endpoint)
assert response.status_code == 401
# With token - should succeed
headers = {"Authorization": f"Bearer {auth_token}"}
response = client.get(endpoint, headers=headers)
assert response.status_code == 200
@pytest.mark.integration
def test_public_endpoints_accessible(client):
"""Test that public endpoints don't require authentication."""
public_endpoints = [
"/",
"/health",
"/docs",
"/openapi.json"
]
for endpoint in public_endpoints:
response = client.get(endpoint)
assert response.status_code in [200, 404] # 404 is ok if endpoint doesn't exist
# 运行所有测试
pytest
# 运行单元测试
pytest -m unit
# 运行属性测试
pytest -m property
# 运行集成测试
pytest -m integration
# 生成覆盖率报告
pytest --cov=. --cov-report=html
# 运行特定测试文件
pytest test/test_auth_service.py
# 详细输出
pytest -v
# 显示打印输出
pytest -s
前端需要实现以下功能来与 JWT 认证系统集成:
// web/apps/lq_label/src/atoms/auth-atoms.ts
import { atom } from 'jotai';
import { atomWithStorage } from 'jotai/utils';
interface User {
id: string;
username: string;
email: string;
role: string;
created_at: string;
}
interface AuthTokens {
access_token: string;
refresh_token: string;
}
// 使用 localStorage 持久化 tokens
export const authTokensAtom = atomWithStorage<AuthTokens | null>(
'auth_tokens',
null
);
// 当前用户信息
export const currentUserAtom = atomWithStorage<User | null>(
'current_user',
null
);
// 认证状态
export const isAuthenticatedAtom = atom(
(get) => {
const tokens = get(authTokensAtom);
return tokens !== null && tokens.access_token !== '';
}
);
// 登出 action
export const logoutAtom = atom(
null,
(get, set) => {
set(authTokensAtom, null);
set(currentUserAtom, null);
}
);
// web/apps/lq_label/src/services/auth-service.ts
import axios from 'axios';
const API_BASE_URL = process.env.REACT_APP_API_URL || 'http://localhost:8000';
export interface RegisterData {
username: string;
email: string;
password: string;
}
export interface LoginData {
username: string;
password: string;
}
export interface AuthResponse {
access_token: string;
refresh_token: string;
token_type: string;
user: {
id: string;
username: string;
email: string;
role: string;
created_at: string;
};
}
export const authService = {
async register(data: RegisterData): Promise<{ message: string; user_id: string }> {
const response = await axios.post(`${API_BASE_URL}/api/auth/register`, data);
return response.data;
},
async login(data: LoginData): Promise<AuthResponse> {
const response = await axios.post(`${API_BASE_URL}/api/auth/login`, data);
return response.data;
},
async refreshToken(refreshToken: string): Promise<AuthResponse> {
const response = await axios.post(`${API_BASE_URL}/api/auth/refresh`, {
refresh_token: refreshToken,
});
return response.data;
},
async getCurrentUser(accessToken: string): Promise<AuthResponse['user']> {
const response = await axios.get(`${API_BASE_URL}/api/auth/me`, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
});
return response.data;
},
};
// web/apps/lq_label/src/utils/axios-config.ts
import axios, { AxiosError, InternalAxiosRequestConfig } from 'axios';
import { authTokensAtom, logoutAtom } from '../atoms/auth-atoms';
import { authService } from '../services/auth-service';
import { getDefaultStore } from 'jotai';
const API_BASE_URL = process.env.REACT_APP_API_URL || 'http://localhost:8000';
// 创建 axios 实例
export const apiClient = axios.create({
baseURL: API_BASE_URL,
headers: {
'Content-Type': 'application/json',
},
});
// Jotai store
const store = getDefaultStore();
// 请求拦截器:自动附加 access token
apiClient.interceptors.request.use(
(config: InternalAxiosRequestConfig) => {
const tokens = store.get(authTokensAtom);
if (tokens?.access_token) {
config.headers.Authorization = `Bearer ${tokens.access_token}`;
}
return config;
},
(error) => {
return Promise.reject(error);
}
);
// 响应拦截器:处理 token 过期
let isRefreshing = false;
let failedQueue: Array<{
resolve: (value?: unknown) => void;
reject: (reason?: unknown) => void;
}> = [];
const processQueue = (error: Error | null, token: string | null = null) => {
failedQueue.forEach((prom) => {
if (error) {
prom.reject(error);
} else {
prom.resolve(token);
}
});
failedQueue = [];
};
apiClient.interceptors.response.use(
(response) => response,
async (error: AxiosError) => {
const originalRequest = error.config as InternalAxiosRequestConfig & {
_retry?: boolean;
};
// 如果是 401 错误且不是登录/注册请求
if (
error.response?.status === 401 &&
originalRequest &&
!originalRequest._retry &&
!originalRequest.url?.includes('/auth/login') &&
!originalRequest.url?.includes('/auth/register')
) {
if (isRefreshing) {
// 如果正在刷新,将请求加入队列
return new Promise((resolve, reject) => {
failedQueue.push({ resolve, reject });
})
.then((token) => {
originalRequest.headers.Authorization = `Bearer ${token}`;
return apiClient(originalRequest);
})
.catch((err) => {
return Promise.reject(err);
});
}
originalRequest._retry = true;
isRefreshing = true;
const tokens = store.get(authTokensAtom);
if (!tokens?.refresh_token) {
// 没有 refresh token,直接登出
store.set(logoutAtom);
return Promise.reject(error);
}
try {
// 尝试刷新 token
const response = await authService.refreshToken(tokens.refresh_token);
// 更新 tokens
store.set(authTokensAtom, {
access_token: response.access_token,
refresh_token: response.refresh_token,
});
// 处理队列中的请求
processQueue(null, response.access_token);
// 重试原始请求
originalRequest.headers.Authorization = `Bearer ${response.access_token}`;
return apiClient(originalRequest);
} catch (refreshError) {
// 刷新失败,登出用户
processQueue(refreshError as Error, null);
store.set(logoutAtom);
return Promise.reject(refreshError);
} finally {
isRefreshing = false;
}
}
return Promise.reject(error);
}
);
// web/apps/lq_label/src/components/login-form/login-form.tsx
import React, { useState } from 'react';
import { useAtom } from 'jotai';
import { useNavigate } from 'react-router-dom';
import { authTokensAtom, currentUserAtom } from '../../atoms/auth-atoms';
import { authService } from '../../services/auth-service';
export const LoginForm: React.FC = () => {
const [, setTokens] = useAtom(authTokensAtom);
const [, setCurrentUser] = useAtom(currentUserAtom);
const navigate = useNavigate();
const [formData, setFormData] = useState({
username: '',
password: '',
});
const [error, setError] = useState<string | null>(null);
const [loading, setLoading] = useState(false);
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
setError(null);
setLoading(true);
try {
const response = await authService.login(formData);
// 保存 tokens 和用户信息
setTokens({
access_token: response.access_token,
refresh_token: response.refresh_token,
});
setCurrentUser(response.user);
// 跳转到主页
navigate('/');
} catch (err: any) {
setError(err.response?.data?.detail || '登录失败,请重试');
} finally {
setLoading(false);
}
};
return (
<form onSubmit={handleSubmit} className="space-y-comfortable">
<div>
<label htmlFor="username" className="text-body-medium">
用户名
</label>
<input
id="username"
type="text"
value={formData.username}
onChange={(e) => setFormData({ ...formData, username: e.target.value })}
required
className="w-full p-comfortable border rounded"
/>
</div>
<div>
<label htmlFor="password" className="text-body-medium">
密码
</label>
<input
id="password"
type="password"
value={formData.password}
onChange={(e) => setFormData({ ...formData, password: e.target.value })}
required
className="w-full p-comfortable border rounded"
/>
</div>
{error && (
<div className="text-error-foreground bg-error-background p-comfortable rounded">
{error}
</div>
)}
<button
type="submit"
disabled={loading}
className="w-full bg-primary-background text-primary-foreground p-comfortable rounded"
>
{loading ? '登录中...' : '登录'}
</button>
</form>
);
};
// web/apps/lq_label/src/components/protected-route/protected-route.tsx
import React from 'react';
import { Navigate } from 'react-router-dom';
import { useAtomValue } from 'jotai';
import { isAuthenticatedAtom } from '../../atoms/auth-atoms';
interface ProtectedRouteProps {
children: React.ReactNode;
}
export const ProtectedRoute: React.FC<ProtectedRouteProps> = ({ children }) => {
const isAuthenticated = useAtomValue(isAuthenticatedAtom);
if (!isAuthenticated) {
return <Navigate to="/login" replace />;
}
return <>{children}</>;
};
// web/apps/lq_label/src/app/app.tsx
import { BrowserRouter, Routes, Route } from 'react-router-dom';
import { LoginForm } from '../components/login-form/login-form';
import { RegisterForm } from '../components/register-form/register-form';
import { ProtectedRoute } from '../components/protected-route/protected-route';
import { ProjectList } from '../views/project-list/project-list';
export function App() {
return (
<BrowserRouter>
<Routes>
<Route path="/login" element={<LoginForm />} />
<Route path="/register" element={<RegisterForm />} />
<Route
path="/"
element={
<ProtectedRoute>
<ProjectList />
</ProtectedRoute>
}
/>
{/* 其他受保护的路由 */}
</Routes>
</BrowserRouter>
);
}
# web/apps/lq_label/.env
REACT_APP_API_URL=http://localhost:8000