将标注平台从"本地 JWT 签发 + SSO 身份认证"模式迁移为"SSO 统一 token 签发"模式。核心变化是:后端不再签发 JWT,而是直接透传 SSO 中心的 token;中间件通过调用 SSO 的 /oauth/userinfo 端点验证 token 有效性,并引入内存缓存减少对 SSO 中心的请求压力。
改造前:
用户 → SSO 登录 → 获取 SSO token → 后端用 SSO token 拉取用户信息 → 后端签发本地 JWT → 前端用本地 JWT 访问 API
改造后:
用户 → SSO 登录 → 获取 SSO token → 后端透传 SSO token 给前端 → 前端用 SSO token 访问 API → 后端向 SSO 验证 token
sequenceDiagram
participant User as 用户/前端
participant Platform as 标注平台后端
participant SSO as SSO 认证中心
participant Cache as Token 缓存
participant DB as 本地数据库
Note over User,SSO: === SSO 登录流程 ===
User->>Platform: GET /api/oauth/login
Platform->>User: 返回 authorization_url + state
User->>SSO: 重定向到 SSO 登录页
SSO->>User: 用户授权后重定向回 callback (带 code)
User->>Platform: GET /api/oauth/callback?code=xxx&state=yyy
Platform->>SSO: POST /oauth/token (用 code 换 token)
SSO->>Platform: 返回 access_token + refresh_token
Platform->>SSO: GET /oauth/userinfo (用 access_token)
SSO->>Platform: 返回用户信息 {code:0, data:{...}}
Platform->>DB: 同步用户信息到本地
Platform->>Cache: 缓存 token → user 映射
Platform->>User: 返回 SSO access_token + refresh_token + user
Note over User,SSO: === API 请求认证流程 ===
User->>Platform: API 请求 (Authorization: Bearer SSO_token)
Platform->>Cache: 查询 token 缓存
alt 缓存命中
Cache->>Platform: 返回缓存的用户信息
else 缓存未命中
Platform->>SSO: GET /oauth/userinfo (验证 token)
SSO->>Platform: 返回用户信息
Platform->>Cache: 缓存 token → user 映射
end
Platform->>User: 返回 API 响应
Note over User,SSO: === Token 刷新流程 ===
User->>Platform: POST /api/oauth/refresh (refresh_token)
Platform->>SSO: POST /oauth/token (grant_type=refresh_token)
SSO->>Platform: 返回新的 access_token + refresh_token
Platform->>User: 返回新 tokens
内存级 token 缓存服务,减少对 SSO 中心的请求。
class TokenCacheService:
"""SSO Token 内存缓存服务"""
def __init__(self, ttl_seconds: int = 300):
"""
Args:
ttl_seconds: 缓存过期时间,默认 300 秒(5 分钟)
"""
self._cache: Dict[str, CacheEntry] = {}
self._ttl = ttl_seconds
def get(self, token: str) -> Optional[Dict]:
"""查询缓存,返回用户信息或 None"""
...
def set(self, token: str, user_info: Dict) -> None:
"""写入缓存"""
...
def invalidate(self, token: str) -> None:
"""使指定 token 缓存失效"""
...
def clear(self) -> None:
"""清空所有缓存"""
...
从本地 JWT 验证改为 SSO token 验证 + 缓存。
# 改造前:
payload = JWTService.verify_token(token, "access")
# 改造后:
# 1. 先查缓存
user_info = token_cache.get(token)
if not user_info:
# 2. 缓存未命中,调 SSO userinfo 验证
user_info = await sso_verify_token(token)
# 3. 验证成功则写入缓存
token_cache.set(token, user_info)
关键变化:
JWTService.verify_token 的依赖exchange_code_for_token: 保持不变,继续用授权码换 tokenget_user_info: 保持不变,继续获取用户信息sync_user_from_oauth: 保持不变,继续同步用户到本地refresh_sso_token: 转发 refresh 请求到 SSO 中心新增 verify_sso_token: 用 SSO userinfo 端点验证 token
@staticmethod
async def verify_sso_token(access_token: str) -> Dict[str, Any]:
"""
通过 SSO userinfo 端点验证 token 并获取用户信息
Returns:
用户信息字典 {id, username, email, ...}
Raises:
HTTPException(401): token 无效
HTTPException(503): SSO 不可用
"""
...
@staticmethod
async def refresh_sso_token(refresh_token: str) -> Dict[str, Any]:
"""
向 SSO 中心刷新 token
Returns:
新的 token 信息 {access_token, refresh_token, ...}
Raises:
HTTPException(401): refresh_token 无效
HTTPException(503): SSO 不可用
"""
...
/api/oauth/callback: 不再调用 JWTService,直接返回 SSO token/api/oauth/refresh(新增): 转发 refresh 请求到 SSO 中心/api/oauth/login: 保持不变/api/oauth/status: 保持不变移除以下端点:
POST /api/auth/register — 不再支持本地注册POST /api/auth/login — 不再支持本地登录POST /api/auth/refresh — 刷新改走 /api/oauth/refreshGET /api/auth/me — 改为 GET /api/oauth/me(从 SSO 验证后的 request.state 获取)整个 jwt_service.py 不再需要,因为不再本地签发或验证 JWT。
移除 register_user、login_user、refresh_tokens 方法。
保留 get_current_user(从数据库查询用户详情)。
回调后存储的是 SSO token 而非本地 JWT:
// 改造前:
setAuth({
tokens: {
access_token: response.access_token, // 本地 JWT
refresh_token: response.refresh_token, // 本地 JWT
token_type: response.token_type,
},
user: response.user,
});
// 改造后(结构不变,内容变了):
setAuth({
tokens: {
access_token: response.access_token, // SSO token
refresh_token: response.refresh_token, // SSO refresh token
token_type: response.token_type,
},
user: response.user,
});
实际上前端存储结构不需要变化,只是 token 的来源变了。
刷新端点从 /api/auth/refresh 改为 /api/oauth/refresh:
// 改造前:
const response = await axios.post(`${API_BASE_URL}/api/auth/refresh`, {
refresh_token: tokens.refresh_token,
});
// 改造后:
const response = await axios.post(`${API_BASE_URL}/api/oauth/refresh`, {
refresh_token: tokens.refresh_token,
});
移除本地登录表单,直接发起 SSO 登录:
// 改造后的 LoginForm:
// 页面加载时自动检查 OAuth 状态并跳转 SSO
useEffect(() => {
startOAuthLogin();
}, []);
移除注册页面和相关路由。
PUBLIC_PATHS = {
"/",
"/health",
"/docs",
"/openapi.json",
"/redoc",
"/api/oauth/status",
"/api/oauth/login",
"/api/oauth/callback",
"/api/oauth/refresh",
}
@dataclass
class CacheEntry:
user_info: Dict[str, Any] # {id, username, email, role, ...}
created_at: float # time.time() 时间戳
def is_expired(self, ttl: float) -> bool:
return (time.time() - self.created_at) > ttl
根据 SSO demo 代码,SSO 中心的 userinfo 响应格式为:
{
"code": 0,
"data": {
"id": "user_123",
"username": "zhangsan",
"email": "zhangsan@example.com",
"name": "张三"
}
}
{
"access_token": "eyJhbGciOiJSUzI1NiIs...",
"refresh_token": "dGhpcyBpcyBhIHJlZnJlc2g...",
"token_type": "bearer",
"expires_in": 3600
}
或包装格式:
{
"code": 0,
"data": {
"access_token": "eyJhbGciOiJSUzI1NiIs...",
"refresh_token": "dGhpcyBpcyBhIHJlZnJlc2g...",
"token_type": "bearer",
"expires_in": 3600
}
}
# 移除 jwt 配置段
# jwt:
# secret_key: "..."
# algorithm: "HS256"
# access_token_expire_minutes: 60
# refresh_token_expire_days: 7
# 保留并增强 oauth 配置
oauth:
enabled: true
base_url: "http://192.168.92.61:8000"
client_id: "nlKLQJdJK3f5ub7UDfQ_E71z2Lo3YSQx"
client_secret: "wh0HU_9T83rYMjfLFToNxFOKcrk_8H7Ba_27nNGlPqtTf9ROCytsOgp2ue0ol5mm"
redirect_uri: "http://localhost:4200/auth/callback"
scope: "profile email"
authorize_endpoint: "/oauth/login"
token_endpoint: "/oauth/token"
userinfo_endpoint: "/oauth/userinfo"
revoke_endpoint: "/oauth/revoke"
token_cache_ttl: 300 # 新增:缓存 TTL(秒)
无数据库 schema 变更。users 表保持不变,继续通过 oauth_provider 和 oauth_id 关联 SSO 用户。
A property is a characteristic or behavior that should hold true across all valid executions of a system — essentially, a formal statement about what the system should do. Properties serve as the bridge between human-readable specifications and machine-verifiable correctness guarantees.
For any token that has been previously verified and cached (within TTL), querying the cache with that token should return the same user information that was originally cached, without making any call to the SSO center.
Validates: Requirements 3.1, 3.2
For any valid SSO token, after the Auth_Middleware successfully verifies it against the SSO center, the token-to-user mapping should be present in the cache, and a subsequent cache lookup for the same token should return the user info.
Validates: Requirements 3.4
For any token that the SSO center rejects (returns non-zero code or HTTP error), the Auth_Middleware should return HTTP 401 and the token should not be cached.
Validates: Requirements 3.5
For any cached token entry, after the configured TTL has elapsed, the cache should return None for that token, forcing re-verification against the SSO center.
Validates: Requirements 3.7
For any SSO user information received during OAuth callback, after sync_user_from_oauth completes, the local database should contain a user record where oauth_provider equals "sso", oauth_id matches the SSO-provided ID, and username and email match the latest SSO-provided values.
Validates: Requirements 5.2, 5.3, 5.4
| 场景 | HTTP 状态码 | 错误类型 | 描述 |
|---|---|---|---|
| 缺少 Authorization header | 401 | missing_token |
请求未携带认证令牌 |
| Bearer 格式错误 | 401 | invalid_token_format |
Authorization header 格式不正确 |
| SSO 验证失败(token 无效) | 401 | invalid_token |
SSO 中心拒绝该 token |
| SSO 验证失败(token 过期) | 401 | token_expired |
SSO token 已过期 |
| SSO 中心不可用 | 503 | sso_unavailable |
无法连接 SSO 认证中心 |
| OAuth 未启用 | 503 | sso_not_configured |
OAuth/SSO 配置未启用 |
| Refresh token 无效 | 401 | invalid_refresh_token |
SSO 刷新令牌无效或已过期 |
| 用户信息同步失败 | 500 | user_sync_error |
无法将 SSO 用户同步到本地数据库 |
保持与现有系统一致:
{
"detail": "错误描述信息",
"error_type": "error_type_code"
}
pytest + pytest-asyncio(异步测试)hypothesis(Python property-based testing 库)TokenCacheService 测试
OAuthService 新方法测试
verify_sso_token 成功/失败场景refresh_sso_token 成功/失败场景AuthMiddleware 测试
前端测试
每个属性测试至少运行 100 次迭代。
测试标注格式:Feature: sso-token-unification, Property {N}: {property_text}