Selaa lähdekoodia

按AI能力中台 统一认证平台授权逻辑修改

lingmin_package@163.com 3 viikkoa sitten
vanhempi
sitoutus
82f861d106

+ 0 - 259
backend/JWT_AUTHENTICATION_GUIDE.md

@@ -1,259 +0,0 @@
-# JWT 认证系统快速指南
-
-## 概述
-
-本系统已成功集成 JWT(JSON Web Token)认证体系,提供完整的用户注册、登录、令牌刷新和基于角色的访问控制功能。
-
-## 主要特性
-
-✅ **用户注册和登录**
-- 用户名和邮箱唯一性验证
-- 密码使用 bcrypt 加密存储
-- 自动分配默认角色(annotator)
-
-✅ **JWT 令牌管理**
-- Access Token(15分钟有效期)
-- Refresh Token(7天有效期)
-- Token Rotation 机制
-
-✅ **中间件认证**
-- 自动验证所有受保护端点
-- 公开端点无需认证
-- 用户信息自动附加到请求
-
-✅ **基于角色的访问控制**
-- annotator:普通标注员
-- admin:管理员(可删除资源)
-
-✅ **自动用户关联**
-- 创建任务时自动分配给当前用户
-- 创建标注时自动使用当前用户 ID
-- 用户只能查看和修改自己的标注
-
-## 快速开始
-
-### 1. 配置环境变量
-
-```bash
-# 复制环境变量模板
-cp .env.example .env
-
-# 生成安全的 JWT 密钥
-python -c "import secrets; print(secrets.token_urlsafe(32))"
-
-# 编辑 .env 文件,设置 JWT_SECRET_KEY
-```
-
-### 2. 启动服务器
-
-```bash
-python main.py
-```
-
-服务器将在 http://localhost:8000 启动
-
-### 3. 测试认证流程
-
-运行测试脚本:
-
-```powershell
-.\test_auth_flow.ps1
-```
-
-或手动测试:
-
-```bash
-# 注册用户
-curl -X POST "http://localhost:8000/api/auth/register" \
-  -H "Content-Type: application/json" \
-  -d '{"username": "testuser", "email": "test@example.com", "password": "password123"}'
-
-# 登录
-curl -X POST "http://localhost:8000/api/auth/login" \
-  -H "Content-Type: application/json" \
-  -d '{"username": "testuser", "password": "password123"}'
-
-# 使用 token 访问受保护端点
-curl -X GET "http://localhost:8000/api/auth/me" \
-  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
-```
-
-## API 端点
-
-### 认证端点(公开)
-
-| 端点 | 方法 | 描述 |
-|------|------|------|
-| `/api/auth/register` | POST | 用户注册 |
-| `/api/auth/login` | POST | 用户登录 |
-| `/api/auth/refresh` | POST | 刷新令牌 |
-
-### 认证端点(需要 token)
-
-| 端点 | 方法 | 描述 |
-|------|------|------|
-| `/api/auth/me` | GET | 获取当前用户信息 |
-
-### 受保护端点
-
-所有 `/api/projects/*`、`/api/tasks/*`、`/api/annotations/*` 端点都需要认证。
-
-### 管理员专用端点
-
-| 端点 | 方法 | 描述 | 角色要求 |
-|------|------|------|----------|
-| `DELETE /api/projects/{id}` | DELETE | 删除项目 | admin |
-| `DELETE /api/tasks/{id}` | DELETE | 删除任务 | admin |
-
-## 权限规则
-
-### 项目(Projects)
-- ✅ 所有认证用户可以创建、查看、更新项目
-- ❌ 只有管理员可以删除项目
-
-### 任务(Tasks)
-- ✅ 所有认证用户可以创建、查看、更新任务
-- ✅ 创建任务时,如果未指定 `assigned_to`,自动分配给当前用户
-- ❌ 只有管理员可以删除任务
-
-### 标注(Annotations)
-- ✅ 所有认证用户可以创建标注
-- ✅ 创建标注时,自动使用当前用户 ID(忽略请求中的 user_id)
-- ✅ 用户只能查看和修改自己的标注
-- ✅ 管理员可以查看和修改所有标注
-
-## 错误处理
-
-### 401 Unauthorized
-
-**原因**:
-- 缺少 Authorization header
-- Token 格式错误
-- Token 已过期
-- Token 无效
-
-**解决方案**:
-- 确保包含 `Authorization: Bearer <token>` header
-- 使用 refresh token 获取新的 access token
-
-### 403 Forbidden
-
-**原因**:
-- 用户角色权限不足
-- 尝试访问其他用户的资源
-
-**解决方案**:
-- 联系管理员提升权限
-- 只访问自己的资源
-
-## 安全建议
-
-### 开发环境
-- ✅ 可以使用自动生成的 JWT_SECRET_KEY
-- ✅ 使用 HTTP(localhost)
-
-### 生产环境
-- ⚠️ **必须**设置强随机的 JWT_SECRET_KEY
-- ⚠️ **必须**使用 HTTPS
-- ⚠️ **必须**配置正确的 CORS 设置
-- ⚠️ 建议启用速率限制
-- ⚠️ 建议添加日志审计
-
-## 令牌生命周期
-
-```
-注册/登录
-    ↓
-获得 Access Token (15分钟) + Refresh Token (7天)
-    ↓
-使用 Access Token 访问 API
-    ↓
-Access Token 过期
-    ↓
-使用 Refresh Token 获取新的 Access Token
-    ↓
-继续使用新的 Access Token
-    ↓
-Refresh Token 过期
-    ↓
-需要重新登录
-```
-
-## 数据库结构
-
-### users 表
-
-| 字段 | 类型 | 描述 |
-|------|------|------|
-| id | TEXT | 用户唯一标识 |
-| username | TEXT | 用户名(唯一) |
-| email | TEXT | 邮箱(唯一) |
-| password_hash | TEXT | 密码哈希 |
-| role | TEXT | 用户角色 |
-| oauth_provider | TEXT | OAuth 提供商(预留) |
-| oauth_id | TEXT | OAuth ID(预留) |
-| created_at | TIMESTAMP | 创建时间 |
-
-## 测试结果
-
-所有测试已通过 ✅
-
-1. ✅ 用户注册
-2. ✅ 用户登录
-3. ✅ 访问受保护端点(带 token)
-4. ✅ 拒绝未认证请求(无 token)
-5. ✅ 创建项目(认证)
-6. ✅ 创建任务(自动分配给当前用户)
-7. ✅ 创建标注(自动使用当前用户 ID)
-8. ✅ 令牌刷新
-9. ✅ 拒绝非管理员删除操作
-
-## 下一步
-
-### 可选功能(未实现)
-
-- [ ] 前端认证集成(React + Jotai)
-- [ ] OAuth 2.0 集成(Google, GitHub)
-- [ ] 邮箱验证
-- [ ] 密码重置
-- [ ] 用户资料管理
-- [ ] 审计日志
-- [ ] 速率限制
-- [ ] 双因素认证(2FA)
-
-### 实现前端认证
-
-参考 `.kiro/specs/jwt-authentication/tasks.md` 中的 Task 14,包含:
-
-1. 创建认证 Atoms(Jotai)
-2. 创建认证服务
-3. 配置 Axios 拦截器
-4. 创建登录/注册组件
-5. 实现路由保护
-
-## 故障排除
-
-### 问题:服务器启动时提示"使用默认生成的 JWT_SECRET_KEY"
-
-**解决方案**:这是正常的开发环境提示。生产环境请在 `.env` 文件中设置 `JWT_SECRET_KEY`。
-
-### 问题:Token 一直提示过期
-
-**解决方案**:
-1. 检查系统时间是否正确
-2. 使用 refresh token 获取新的 access token
-3. 如果 refresh token 也过期,需要重新登录
-
-### 问题:无法访问其他用户的标注
-
-**解决方案**:这是设计行为。普通用户只能访问自己的标注,管理员可以访问所有标注。
-
-## 联系方式
-
-如有问题或建议,请联系项目维护者。
-
----
-
-**文档版本**: 1.0  
-**最后更新**: 2024-01-22  
-**状态**: ✅ 生产就绪

+ 16 - 17
backend/config.py

@@ -64,23 +64,22 @@ class Settings:
         self.MYSQL_PASSWORD = mysql_config.get('password', '')
         self.MYSQL_DATABASE = mysql_config.get('database', 'annotation_platform')
 
-        # OAuth/SSO Settings
-        oauth_config = config.get('oauth', {})
-        self.OAUTH_ENABLED = oauth_config.get('enabled', False)
-        self.OAUTH_BASE_URL = oauth_config.get('base_url', '')
-        self.OAUTH_CLIENT_ID = oauth_config.get('client_id', '')
-        self.OAUTH_CLIENT_SECRET = oauth_config.get('client_secret', '')
-        self.OAUTH_REDIRECT_URI = oauth_config.get('redirect_uri', '')
-        self.OAUTH_SCOPE = oauth_config.get('scope', 'profile email')
-
-        # OAuth Endpoints
-        self.OAUTH_AUTHORIZE_ENDPOINT = oauth_config.get('authorize_endpoint', '/oauth/authorize')
-        self.OAUTH_TOKEN_ENDPOINT = oauth_config.get('token_endpoint', '/oauth/token')
-        self.OAUTH_USERINFO_ENDPOINT = oauth_config.get('userinfo_endpoint', '/oauth/userinfo')
-        self.OAUTH_REVOKE_ENDPOINT = oauth_config.get('revoke_endpoint', '/oauth/revoke')
-
-        # Token Cache TTL (seconds)
-        self.TOKEN_CACHE_TTL = oauth_config.get('token_cache_ttl', 300)
+        # SSO/SSO Settings
+        sso_config = config.get('sso', {})
+        self.SSO_ENABLED = sso_config.get('enabled', True)
+        self.SSO_BASE_URL = sso_config.get('base_url', '')
+        self.SSO_CLIENT_ID = sso_config.get('client_id', '')
+        self.SSO_CLIENT_SECRET = sso_config.get('client_secret', '')
+        self.SSO_REDIRECT_URI = sso_config.get('redirect_uri', '')
+        self.SSO_SCOPE = sso_config.get('scope', 'profile email')
+        self.SSO_LOGOUT_REDIRECT_URL = sso_config.get('logout_redirect_url', '')
+        self.SSO_FRONTEND_URL = sso_config.get('frontend_url', '')
+
+        # SSO Endpoints
+        self.SSO_AUTHORIZE_ENDPOINT = sso_config.get('authorize_endpoint', '/oauth/authorize')
+        self.SSO_TOKEN_ENDPOINT = sso_config.get('token_endpoint', '/oauth/token')
+        self.SSO_USERINFO_ENDPOINT = sso_config.get('userinfo_endpoint', '/oauth/userinfo')
+        self.SSO_REVOKE_ENDPOINT = sso_config.get('revoke_endpoint', '/oauth/revoke')
 
         # Server Settings
         server_config = config.get('server', {})

+ 8 - 12
backend/config/config.dev.yaml

@@ -1,20 +1,21 @@
 # 开发环境配置
 
-# OAuth 2.0 单点登录配置
-oauth:
-  enabled: true
-  base_url: "http://192.168.92.61:8000"
+# SSO 单点登录配置(统一认证平台)
+sso:
+  base_url: "http://192.168.92.61:8200"
   client_id: "nlKLQJdJK3f5ub7UDfQ_E71z2Lo3YSQx"
   client_secret: "wh0HU_9T83rYMjfLFToNxFOKcrk_8H7Ba_27nNGlPqtTf9ROCytsOgp2ue0ol5mm"
   redirect_uri: "http://localhost:4200/auth/callback"
   scope: "profile email"
+  logout_redirect_url: "http://192.168.92.61:9200/login"
+  frontend_url: "http://localhost:4200"
   
-  # OAuth 端点
-  authorize_endpoint: "/oauth/login"
+
+  # SSO 端点
+  authorize_endpoint: "/oauth/authorize"
   token_endpoint: "/oauth/token"
   userinfo_endpoint: "/oauth/userinfo"
   revoke_endpoint: "/oauth/revoke"
-  token_cache_ttl: 300  # Token 缓存 TTL(秒)
 
 # 数据库配置 (MySQL)
 database:
@@ -24,11 +25,6 @@ database:
     user: "root"
     password: "Lq123456!"
     database: "lq_label_dev"
-    # host: "192.168.92.96"
-    # port: 30199
-    # user: "root"
-    # password: "Lq123456!"
-    # database: "lq_label_test"
 
 # 服务器配置
 server:

+ 12 - 12
backend/config/config.prod.yaml

@@ -1,29 +1,29 @@
 # 生产环境配置
 
-# OAuth 2.0 单点登录配置
-oauth:
-  enabled: true
-  base_url: "http://192.168.92.61:8000"
+# SSO 单点登录配置(统一认证平台)
+sso:
+  base_url: "http://192.168.92.61:8200"
   client_id: "nlKLQJdJK3f5ub7UDfQ_E71z2Lo3YSQx"
   client_secret: "wh0HU_9T83rYMjfLFToNxFOKcrk_8H7Ba_27nNGlPqtTf9ROCytsOgp2ue0ol5mm"
-  redirect_uri: "http://192.168.92.61:9003/auth/callback"
+  redirect_uri: "http://localhost:4200/auth/callback"
   scope: "profile email"
-  
-  # OAuth 端点
-  authorize_endpoint: "/oauth/login"
+  logout_redirect_url: "http://localhost:3000/login"
+  frontend_url: "http://localhost:4200"
+
+  # SSO 端点
+  authorize_endpoint: "/oauth/authorize"
   token_endpoint: "/oauth/token"
   userinfo_endpoint: "/oauth/userinfo"
   revoke_endpoint: "/oauth/revoke"
-  token_cache_ttl: 300  # Token 缓存 TTL(秒)
 
 # 数据库配置 (MySQL)
 database:
   mysql:
-    host: "192.168.92.61"
-    port: 13306
+    host: "192.168.92.96"
+    port: 30199
     user: "root"
     password: "Lq123456!"
-    database: "lq_label_dev"
+    database: "lq_label_test"
 
 # 服务器配置
 server:

+ 12 - 12
backend/config/config.test.yaml

@@ -1,29 +1,29 @@
-# 开发环境配置
+# 测试环境配置
 
-# OAuth 2.0 单点登录配置
-oauth:
-  enabled: true
-  base_url: "http://192.168.92.62:8000"
+# SSO 单点登录配置(统一认证平台)
+sso:
+  base_url: "http://192.168.92.61:8200"
   client_id: "nlKLQJdJK3f5ub7UDfQ_E71z2Lo3YSQx"
   client_secret: "wh0HU_9T83rYMjfLFToNxFOKcrk_8H7Ba_27nNGlPqtTf9ROCytsOgp2ue0ol5mm"
   redirect_uri: "http://localhost:4200/auth/callback"
   scope: "profile email"
-  
-  # OAuth 端点
-  authorize_endpoint: "/oauth/login"
+  logout_redirect_url: "http://localhost:3000/login"
+  frontend_url: "http://localhost:4200"
+
+  # SSO 端点
+  authorize_endpoint: "/oauth/authorize"
   token_endpoint: "/oauth/token"
   userinfo_endpoint: "/oauth/userinfo"
   revoke_endpoint: "/oauth/revoke"
-  token_cache_ttl: 300  # Token 缓存 TTL(秒)
 
 # 数据库配置 (MySQL)
 database:
   mysql:
-    host: "192.168.92.96"
-    port: 30199
+    host: "192.168.92.61"
+    port: 13306
     user: "root"
     password: "Lq123456!"
-    database: "lq_label_test"
+    database: "lq_label_dev"
 
 # 服务器配置
 server:

+ 50 - 109
backend/middleware/auth_middleware.py

@@ -1,7 +1,6 @@
 """
-Authentication Middleware for SSO token verification.
-Validates SSO tokens via the SSO center's userinfo endpoint,
-with an in-memory cache to reduce external calls.
+Authentication Middleware for JWT verification.
+Validates locally-signed JWT tokens.
 
 Also supports admin tokens generated by generate_admin_token.py script.
 """
@@ -10,39 +9,29 @@ from datetime import datetime, timezone
 from fastapi import Request, HTTPException, status
 from fastapi.responses import JSONResponse
 from starlette.middleware.base import BaseHTTPMiddleware
-from services.token_cache_service import TokenCacheService
-from services.oauth_service import OAuthService
+from services import jwt_service
 from config import settings
 from database import get_db_connection
 
 logger = logging.getLogger(__name__)
 
-# 全局 token 缓存实例
-# SSO token 有效期 600 秒,缓存设置为 550 秒(留 50 秒余量)
-token_cache = TokenCacheService(
-    ttl_seconds=getattr(settings, 'TOKEN_CACHE_TTL', 550)
-)
-
 
 def verify_admin_token(token: str) -> dict:
     """
     验证管理员 Token(从数据库查询)
-
-    Args:
-        token: Token 字符串
-
-    Returns:
-        dict: 用户信息字典,或 None(Token 无效或已过期)
     """
     try:
         with get_db_connection() as conn:
             cursor = conn.cursor()
-            cursor.execute("""
+            cursor.execute(
+                """
                 SELECT at.user_id, u.username, u.email, u.role, at.expires_at
                 FROM admin_tokens at
                 JOIN users u ON at.user_id = u.id
                 WHERE at.token = %s AND at.expires_at > %s
-            """, (token, datetime.now(timezone.utc)))
+                """,
+                (token, datetime.now(timezone.utc)),
+            )
             row = cursor.fetchone()
 
             if not row:
@@ -62,8 +51,8 @@ def verify_admin_token(token: str) -> dict:
 
 class AuthMiddleware(BaseHTTPMiddleware):
     """
-    SSO Token 认证中间件。
-    先查本地缓存,未命中则调用 SSO userinfo 端点验证
+    JWT 认证中间件。
+    验证本地签发的 JWT,或管理员 Token
     """
 
     PUBLIC_PATHS = {
@@ -74,31 +63,20 @@ class AuthMiddleware(BaseHTTPMiddleware):
         "/redoc",
         "/api/oauth/status",
         "/api/oauth/login",
-        "/api/oauth/callback",
+        "/api/oauth/exchange-code",
         "/api/oauth/refresh",
+        "/api/oauth/logout",
     }
 
     async def dispatch(self, request: Request, call_next):
         # Skip authentication for public paths
-        logger.debug(f"AuthMiddleware: path={request.url.path}, method={request.method}")
         if request.url.path in self.PUBLIC_PATHS:
-            logger.debug(f"Skipping auth for public path: {request.url.path}")
             return await call_next(request)
 
         # Skip authentication for OPTIONS requests (CORS preflight)
         if request.method == "OPTIONS":
             return await call_next(request)
 
-        # Check if OAuth/SSO is enabled
-        if not settings.OAUTH_ENABLED:
-            return JSONResponse(
-                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
-                content={
-                    "detail": "SSO 认证未配置",
-                    "error_type": "sso_not_configured"
-                }
-            )
-
         # Extract token from Authorization header
         auth_header = request.headers.get("Authorization")
 
@@ -107,8 +85,8 @@ class AuthMiddleware(BaseHTTPMiddleware):
                 status_code=status.HTTP_401_UNAUTHORIZED,
                 content={
                     "detail": "缺少认证令牌",
-                    "error_type": "missing_token"
-                }
+                    "error_type": "missing_token",
+                },
             )
 
         # Verify Bearer token format
@@ -118,68 +96,45 @@ class AuthMiddleware(BaseHTTPMiddleware):
                 status_code=status.HTTP_401_UNAUTHORIZED,
                 content={
                     "detail": "无效的认证令牌格式",
-                    "error_type": "invalid_token_format"
-                }
+                    "error_type": "invalid_token_format",
+                },
             )
 
-        sso_token = parts[1]
+        token = parts[1]
 
         try:
             # 1. 先检查是否是管理员 Token(以 admin_token_ 开头)
             user_info = None
-            if sso_token.startswith("admin_token_"):
-                logger.debug("检测到管理员 Token,尝试从数据库验证")
-                user_info = verify_admin_token(sso_token)
+            if token.startswith("admin_token_"):
+                user_info = verify_admin_token(token)
                 if user_info:
                     logger.info(f"管理员 Token 验证成功:{user_info['username']}")
 
-            # 2. 如果不是管理员 Token,查本地缓存(SSO token)
+            # 2. 如果不是管理员 Token,验证 JWT
             if user_info is None:
-                user_info = token_cache.get(sso_token)
-
-            # 3. 缓存未命中,调 SSO profile 验证(含角色信息)
-            if user_info is None:
-                user_info = await OAuthService.verify_sso_token(sso_token)
-
-                # 3. 同步用户到本地数据库(更新角色),获取本地用户ID
-                try:
-                    local_user = OAuthService.sync_user_from_oauth(oauth_user_info=user_info)
-                    # 将本地user.id也存入user_info,供后续使用
-                    user_info["local_user_id"] = local_user.id
-                except Exception as sync_err:
-                    logger.warning(f"用户同步失败(不影响认证): {sync_err}")
-
-                # 4. 写入缓存
-                token_cache.set(sso_token, user_info)
-
-            # 提取用户信息,优先使用本地用户ID
-            # 如果缓存中没有 local_user_id,则重新同步用户
-            local_user_id = user_info.get("local_user_id")
-            if not local_user_id:
-                try:
-                    local_user = OAuthService.sync_user_from_oauth(oauth_user_info=user_info)
-                    local_user_id = local_user.id
-                    # 更新缓存,避免下次重复同步
-                    user_info["local_user_id"] = local_user_id
-                    token_cache.set(sso_token, user_info)
-                except Exception as sync_err:
-                    logger.warning(f"重新同步用户失败,使用SSO ID: {sync_err}")
-            
-            user_id = local_user_id or user_info.get("id") or user_info.get("sub") if not user_info.get("is_admin_token") else user_info.get("id")
-            username = (
-                user_info.get("username")
-                or user_info.get("preferred_username")
-                or user_info.get("name")
-            )
-            email = user_info.get("email", "")
-            role = user_info.get("role", "viewer")
+                payload = jwt_service.verify_token(token)
+                role = payload.get("role", "")
+                if role not in ("admin", "annotator", "viewer"):
+                    return JSONResponse(
+                        status_code=status.HTTP_403_FORBIDDEN,
+                        content={
+                            "detail": "未被识别的 SSO 角色,无权限访问",
+                            "error_type": "unrecognized_role",
+                        },
+                    )
+                user_info = {
+                    "id": payload.get("sub"),
+                    "username": payload.get("username"),
+                    "email": payload.get("email", ""),
+                    "role": role,
+                }
 
             # Attach user info to request state
             request.state.user = {
-                "id": str(user_id),
-                "username": username,
-                "email": email,
-                "role": role,
+                "id": str(user_info["id"]),
+                "username": user_info["username"],
+                "email": user_info["email"],
+                "role": user_info["role"],
             }
 
             response = await call_next(request)
@@ -187,21 +142,15 @@ class AuthMiddleware(BaseHTTPMiddleware):
 
         except HTTPException as e:
             error_type = "invalid_token"
-            if e.status_code == 503:
-                error_type = "sso_unavailable"
-            elif e.status_code == 401:
-                # SSO 返回 401 说明 token 过期或无效,统一标记为 token_expired
-                # 让前端有机会用 refresh_token 刷新
+            if e.status_code == 401:
                 error_type = "token_expired"
-                # 同时清除本地缓存中的过期 token
-                token_cache.invalidate(sso_token)
 
             return JSONResponse(
                 status_code=e.status_code,
                 content={
                     "detail": e.detail,
-                    "error_type": error_type
-                }
+                    "error_type": error_type,
+                },
             )
         except Exception as e:
             logger.error("认证过程发生错误:%s", str(e))
@@ -209,26 +158,16 @@ class AuthMiddleware(BaseHTTPMiddleware):
                 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                 content={
                     "detail": "认证过程发生错误",
-                    "error_type": "auth_error"
-                }
+                    "error_type": "auth_error",
+                },
             )
 
 
 def require_role(*allowed_roles: str):
     """
     Decorator to check user role.
-
-    Usage:
-        @require_role("admin", "annotator")
-        async def my_endpoint(request: Request):
-            ...
-
-    Args:
-        allowed_roles: Tuple of allowed role names
-
-    Returns:
-        Decorator function
     """
+
     def decorator(func):
         async def wrapper(request: Request, *args, **kwargs):
             user = getattr(request.state, "user", None)
@@ -236,15 +175,17 @@ def require_role(*allowed_roles: str):
             if not user:
                 raise HTTPException(
                     status_code=status.HTTP_401_UNAUTHORIZED,
-                    detail="未认证"
+                    detail="未认证",
                 )
 
             if user["role"] not in allowed_roles:
                 raise HTTPException(
                     status_code=status.HTTP_403_FORBIDDEN,
-                    detail="权限不足"
+                    detail="权限不足",
                 )
 
             return await func(request, *args, **kwargs)
+
         return wrapper
+
     return decorator

+ 124 - 99
backend/routers/oauth.py

@@ -1,16 +1,15 @@
 """
 OAuth 2.0 认证路由
-处理 SSO 登录流程、token 刷新和用户信息查询。
-所有认证统一走 SSO,不再本地签发 JWT。
+处理 SSO 登录流程、本地 JWT 签发、token 刷新和用户信息查询。
 """
 import logging
-from fastapi import APIRouter, HTTPException, Query, Request, status
+import httpx
+from fastapi import APIRouter, HTTPException, Request, status
 from pydantic import BaseModel
-from typing import Optional
 from config import settings
 from services.oauth_service import OAuthService
 from services.auth_service import AuthService
-from middleware.auth_middleware import token_cache
+from services import jwt_service
 
 logger = logging.getLogger(__name__)
 router = APIRouter(prefix="/api/oauth", tags=["oauth"])
@@ -23,8 +22,21 @@ class OAuthLoginResponse(BaseModel):
 
 
 class SSOTokenResponse(BaseModel):
-    """SSO Token 响应(透传 SSO 中心的 token)"""
-    access_token: str
+    """SSO Token 响应(本地签发的 JWT)"""
+    token: str
+    refresh_token: str
+    token_type: str = "bearer"
+    user: dict
+
+
+class ExchangeCodeRequest(BaseModel):
+    """授权码交换请求"""
+    code: str
+
+
+class ExchangeCodeResponse(BaseModel):
+    """授权码交换响应(本地签发的 JWT)"""
+    token: str
     refresh_token: str
     token_type: str = "bearer"
     user: dict
@@ -48,140 +60,153 @@ class UserResponse(BaseModel):
 async def oauth_login():
     """
     启动 OAuth 登录流程。
-    生成授权 URL 和 state 参数,前端需要保存 state 并重定向到授权 URL
+    生成授权 URL 和 state 参数。
     """
-    if not settings.OAUTH_ENABLED:
-        raise HTTPException(status_code=503, detail="SSO 认证未配置")
-
     state = OAuthService.generate_state()
     authorization_url = OAuthService.get_authorization_url(state)
 
     return OAuthLoginResponse(
         authorization_url=authorization_url,
-        state=state
+        state=state,
     )
 
 
-@router.get("/callback")
-async def oauth_callback(
-    code: str = Query(..., description="OAuth 授权码"),
-    state: str = Query(..., description="State 参数"),
-):
+@router.post("/exchange-code", response_model=ExchangeCodeResponse)
+async def exchange_code(request_body: ExchangeCodeRequest):
     """
-    OAuth 回调端点。
-    用授权码换取 SSO token,获取用户信息并同步到本地数据库,
-    直接返回 SSO 的 access_token 和 refresh_token(不再本地签发 JWT)。
+    授权码交换端点(前端调用)。
+    前端从 SSO 回调拿到 code 后,调用此接口换取本地 JWT。
     """
-    logger.info(f"OAuth callback received: code={code[:10]}..., state={state[:10]}...")
-    
-    if not settings.OAUTH_ENABLED:
-        raise HTTPException(status_code=503, detail="SSO 认证未配置")
+    logger.info(f"Exchange code received: code={request_body.code[:10]}...")
 
-    try:
-        # 1. 用授权码换取 SSO token
-        logger.debug("Exchanging code for token...")
-        token_data = await OAuthService.exchange_code_for_token(code)
-        access_token = token_data.get("access_token")
-        refresh_token = token_data.get("refresh_token", "")
+    # 1. 用授权码换取 SSO access_token
+    token_data = await OAuthService.exchange_code_for_token(request_body.code)
+    sso_access_token = token_data.get("access_token")
 
-        if not access_token:
-            raise HTTPException(status_code=400, detail="未能获取访问令牌")
+    if not sso_access_token:
+        raise HTTPException(status_code=400, detail="未能获取访问令牌")
 
-        # 2. 使用 SSO token 获取完整用户信息(含角色)
-        logger.debug("Verifying SSO token and getting user info...")
-        try:
-            user_info = await OAuthService.verify_sso_token(access_token)
-            logger.debug(f"User info: {user_info.get('username')}, role: {user_info.get('role')}")
-        except HTTPException as e:
-            logger.error(f"verify_sso_token failed: status={e.status_code}, detail={e.detail}")
-            raise
-        except Exception as e:
-            logger.error(f"verify_sso_token unexpected error: {e}", exc_info=True)
-            raise
+    # 2. 获取完整用户信息(含角色)
+    user_info = await OAuthService.get_user_profile(sso_access_token)
 
-        # 3. 同步用户到本地数据库(含角色映射)
-        logger.debug("Syncing user to local database...")
-        try:
-            user = OAuthService.sync_user_from_oauth(user_info)
-            logger.debug(f"User synced: id={user.id}, username={user.username}, role={user.role}")
-        except Exception as e:
-            logger.error(f"sync_user_from_oauth failed: {e}", exc_info=True)
-            raise
-
-        # 4. 缓存 token → 用户信息映射
-        token_cache.set(access_token, user_info)
-
-        # 5. 直接返回 SSO token(不再本地签发 JWT)
-        logger.info(f"OAuth login successful for user: {user.username}")
-        return SSOTokenResponse(
-            access_token=access_token,
-            refresh_token=refresh_token,
-            token_type="bearer",
-            user={
-                "id": user.id,
-                "username": user.username,
-                "email": user.email,
-                "role": user.role,
-                "created_at": str(user.created_at)
-            }
-        )
+    # 3. 同步用户到本地数据库
+    user = OAuthService.sync_user_from_oauth(user_info)
 
-    except HTTPException:
-        raise
-    except Exception as e:
-        logger.error(f"OAuth callback error: {e}", exc_info=True)
-        raise HTTPException(
-            status_code=400,
-            detail=f"OAuth 登录失败: {str(e)}"
-        )
+    # 4. 签发本地 JWT
+    access_token = jwt_service.create_access_token(
+        user_id=user.id,
+        username=user.username,
+        email=user.email,
+        role=user.role,
+    )
+    refresh_token = jwt_service.create_refresh_token(user_id=user.id)
+
+    logger.info(f"Code exchange successful for user: {user.username}")
+
+    return ExchangeCodeResponse(
+        token=access_token,
+        refresh_token=refresh_token,
+        token_type="bearer",
+        user={
+            "id": user.id,
+            "username": user.username,
+            "email": user.email,
+            "role": user.role,
+            "created_at": str(user.created_at),
+        },
+    )
 
 
 @router.post("/refresh")
 async def oauth_refresh(request_body: RefreshRequest):
     """
     Token 刷新端点。
-    将 refresh 请求转发到 SSO 中心,返回新的 token。
+    验证 refresh_token(本地 JWT),签发新的 access_token + refresh_token。
     """
-    logger.info(f"Token refresh requested, refresh_token={request_body.refresh_token[:20]}...")
-    
-    if not settings.OAUTH_ENABLED:
-        raise HTTPException(status_code=503, detail="SSO 认证未配置")
+    logger.info("Token refresh requested")
 
     try:
-        token_data = await OAuthService.refresh_sso_token(request_body.refresh_token)
-        logger.info("Token refresh successful")
-        
+        payload = jwt_service.verify_token(request_body.refresh_token)
+
+        if payload.get("type") != "refresh":
+            raise HTTPException(
+                status_code=status.HTTP_401_UNAUTHORIZED,
+                detail="无效的 refresh token",
+            )
+
+        user_id = payload.get("sub")
+
+        # 从数据库获取用户信息
+        user = AuthService.get_current_user(user_id)
+        if not user:
+            raise HTTPException(
+                status_code=status.HTTP_401_UNAUTHORIZED,
+                detail="用户不存在",
+            )
+
+        new_access_token = jwt_service.create_access_token(
+            user_id=user.id,
+            username=user.username,
+            email=user.email,
+            role=user.role,
+        )
+        new_refresh_token = jwt_service.create_refresh_token(user_id=user.id)
+
         return {
-            "access_token": token_data.get("access_token"),
-            "refresh_token": token_data.get("refresh_token", ""),
-            "token_type": token_data.get("token_type", "bearer"),
+            "token": new_access_token,
+            "refresh_token": new_refresh_token,
+            "token_type": "bearer",
         }
-    except HTTPException as e:
-        logger.error(f"Token refresh failed: status={e.status_code}, detail={e.detail}")
+    except HTTPException:
         raise
     except Exception as e:
         logger.error(f"Token refresh unexpected error: {e}", exc_info=True)
         raise HTTPException(
             status_code=400,
-            detail=f"Token 刷新失败: {str(e)}"
+            detail=f"Token 刷新失败: {str(e)}",
         )
 
 
+@router.post("/logout")
+async def oauth_logout(request: Request):
+    """
+    登出端点。
+    可选通知 SSO 注销,返回统一认证平台登录页面 URL 供前端跳转。
+    """
+    auth_header = request.headers.get("Authorization", "")
+    token = auth_header.replace("Bearer ", "") if auth_header.startswith("Bearer ") else None
+
+    if token and settings.SSO_REVOKE_ENDPOINT:
+        # 通知 SSO 注销
+        try:
+            async with httpx.AsyncClient(timeout=5.0) as client:
+                await client.post(
+                    f"{settings.SSO_BASE_URL}{settings.SSO_REVOKE_ENDPOINT}",
+                    headers={"Authorization": f"Bearer {token}"},
+                )
+        except Exception as e:
+            logger.warning(f"SSO revoke failed (non-critical): {e}")
+
+    return {
+        "message": "登出成功",
+        "logout_url": settings.SSO_LOGOUT_REDIRECT_URL,
+    }
+
+
 @router.get("/me", response_model=UserResponse)
 async def get_current_user(request: Request):
     """
     获取当前认证用户信息。
-    用户信息由 AuthMiddleware 从 SSO 验证后填充到 request.state。
+    用户信息由 AuthMiddleware 从 JWT 验证后填充到 request.state。
     """
     user_data = getattr(request.state, "user", None)
 
     if not user_data:
         raise HTTPException(
             status_code=status.HTTP_401_UNAUTHORIZED,
-            detail="未认证"
+            detail="未认证",
         )
 
-    # 从本地数据库获取完整用户信息
     user = AuthService.get_current_user(user_data["id"])
 
     return UserResponse(
@@ -189,15 +214,15 @@ async def get_current_user(request: Request):
         username=user.username,
         email=user.email,
         role=user.role,
-        created_at=str(user.created_at)
+        created_at=str(user.created_at),
     )
 
 
 @router.get("/status")
 async def oauth_status():
-    """获取 OAuth 配置状态"""
+    """获取 SSO 配置状态"""
     return {
-        "enabled": settings.OAUTH_ENABLED,
-        "provider": "SSO" if settings.OAUTH_ENABLED else None,
-        "base_url": settings.OAUTH_BASE_URL if settings.OAUTH_ENABLED else None
+        "enabled": settings.SSO_ENABLED,
+        "provider": "SSO" if settings.SSO_ENABLED else None,
+        "base_url": settings.SSO_BASE_URL if settings.SSO_ENABLED else None,
     }

BIN
backend/schemas/__pycache__/project.cpython-311.pyc


+ 0 - 2
backend/services/__init__.py

@@ -3,12 +3,10 @@ Business logic services package.
 """
 from .auth_service import AuthService
 from .oauth_service import OAuthService
-from .token_cache_service import TokenCacheService
 from .export_service import ExportService
 
 __all__ = [
     "AuthService",
     "OAuthService",
-    "TokenCacheService",
     "ExportService",
 ]

+ 59 - 0
backend/services/jwt_service.py

@@ -0,0 +1,59 @@
+"""
+JWT 签发与验证服务
+用于本地用户认证,替代 SSO token 透传方案
+"""
+import jwt
+import logging
+from datetime import datetime, timedelta, timezone
+from fastapi import HTTPException, status
+from config import settings
+
+logger = logging.getLogger(__name__)
+
+ACCESS_TOKEN_EXPIRE_MINUTES = 30
+REFRESH_TOKEN_EXPIRE_DAYS = 7
+
+
+def create_access_token(user_id: str, username: str, email: str, role: str) -> str:
+    """创建 Access Token(30 分钟有效期)"""
+    expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
+    payload = {
+        "sub": user_id,
+        "username": username,
+        "email": email,
+        "role": role,
+        "exp": expire,
+        "type": "access",
+    }
+    return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
+
+
+def create_refresh_token(user_id: str) -> str:
+    """创建 Refresh Token(7 天有效期)"""
+    expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
+    payload = {
+        "sub": user_id,
+        "exp": expire,
+        "type": "refresh",
+    }
+    return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
+
+
+def verify_token(token: str) -> dict:
+    """
+    验证 JWT Token,返回 payload。
+    如果过期或无效,抛出 HTTPException。
+    """
+    try:
+        payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
+        return payload
+    except jwt.ExpiredSignatureError:
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail="令牌已过期,请重新登录",
+        )
+    except jwt.InvalidTokenError:
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail="无效的令牌",
+        )

+ 140 - 284
backend/services/oauth_service.py

@@ -1,6 +1,6 @@
 """
 OAuth 2.0 认证服务
-处理与 OAuth 认证中心的交互,包括 token 验证和刷新
+处理与统一认证平台的交互,包括 code 换 token、用户信息获取和角色映射
 """
 import httpx
 import logging
@@ -14,388 +14,244 @@ from database import get_db_connection
 
 logger = logging.getLogger(__name__)
 
-# SSO 角色 → 本地角色映射(支持中英文)
+# SSO 角色 → 本地角色映射
+# 仅识别 label_admin(标注管理员)、annotator(标注员)、viewer(查看者)
+# 其他角色一律不识别,无权限
 SSO_ROLE_MAPPING = {
-    # 英文角色名
-    "super_admin": "admin",
+    # 角色代码
     "label_admin": "admin",
-    "admin": "admin",
-    "labeler": "annotator",
-    "user_manager": "admin",
-    "app_manager": "admin",
-    # 中文角色名
-    "超级管理员": "admin",
+    "annotator": "annotator",
+    "viewer": "viewer",
+    # 角色名称(对应中文显示)
     "标注管理员": "admin",
-    "管理员": "admin",
     "标注员": "annotator",
-    "用户管理员": "admin",
-    "应用管理员": "admin",
+    "查看者": "viewer",
 }
-DEFAULT_LOCAL_ROLE = "viewer"
 
 
-def map_sso_roles_to_local(sso_roles: list, is_superuser: bool = False) -> str:
+def map_sso_roles_to_local(sso_roles: list) -> Optional[str]:
     """
-    将 SSO 角色列表映射为本地单一角色。
+    将 SSO 角色列表映射为本地角色。
+    仅识别 label_admin、annotator、viewer,未识别到任何角色则返回 None(无权限)。
     优先级: admin > annotator > viewer
     """
-    if is_superuser:
-        return "admin"
-
-    local_role = DEFAULT_LOCAL_ROLE
+    local_role: Optional[str] = None
     for sso_role in sso_roles:
         mapped = SSO_ROLE_MAPPING.get(sso_role)
         if mapped == "admin":
             return "admin"
         if mapped == "annotator":
             local_role = "annotator"
+        elif mapped == "viewer" and local_role is None:
+            local_role = "viewer"
 
     return local_role
 
 
 class OAuthService:
     """OAuth 2.0 认证服务"""
-    
+
     @staticmethod
     def generate_state() -> str:
-        """
-        生成随机 state 参数,用于防止 CSRF 攻击
-        
-        Returns:
-            随机字符串
-        """
+        """生成随机 state 参数,用于防止 CSRF 攻击"""
         return secrets.token_urlsafe(32)
-    
+
     @staticmethod
     def get_authorization_url(state: str) -> str:
-        """
-        构建 OAuth 授权 URL
-        
-        Args:
-            state: 防CSRF的随机字符串
-            
-        Returns:
-            完整的授权URL
-        """
+        """构建 OAuth 授权 URL"""
         from urllib.parse import urlencode
-        
+
         params = {
             "response_type": "code",
-            "client_id": settings.OAUTH_CLIENT_ID,
-            "redirect_uri": settings.OAUTH_REDIRECT_URI,
-            "scope": settings.OAUTH_SCOPE,
-            "state": state
+            "client_id": settings.SSO_CLIENT_ID,
+            "redirect_uri": settings.SSO_REDIRECT_URI,
+            "scope": settings.SSO_SCOPE,
+            "state": state,
         }
-        
-        authorize_url = f"{settings.OAUTH_BASE_URL}{settings.OAUTH_AUTHORIZE_ENDPOINT}"
+
+        authorize_url = f"{settings.SSO_BASE_URL}{settings.SSO_AUTHORIZE_ENDPOINT}"
         return f"{authorize_url}?{urlencode(params)}"
-    
+
     @staticmethod
     async def exchange_code_for_token(code: str) -> Dict[str, Any]:
-        """
-        用授权码换取访问令牌
-        
-        Args:
-            code: OAuth 授权码
-            
-        Returns:
-            令牌信息字典,包含 access_token, token_type, expires_in 等
-            
-        Raises:
-            Exception: 令牌交换失败
-        """
-        token_url = f"{settings.OAUTH_BASE_URL}{settings.OAUTH_TOKEN_ENDPOINT}"
-        
+        """用授权码换取访问令牌"""
+        token_url = f"{settings.SSO_BASE_URL}{settings.SSO_TOKEN_ENDPOINT}"
+
         async with httpx.AsyncClient() as client:
             response = await client.post(
                 token_url,
                 data={
                     "grant_type": "authorization_code",
                     "code": code,
-                    "redirect_uri": settings.OAUTH_REDIRECT_URI,
-                    "client_id": settings.OAUTH_CLIENT_ID,
-                    "client_secret": settings.OAUTH_CLIENT_SECRET
+                    "redirect_uri": settings.SSO_REDIRECT_URI,
+                    "client_id": settings.SSO_CLIENT_ID,
+                    "client_secret": settings.SSO_CLIENT_SECRET,
                 },
-                headers={"Content-Type": "application/x-www-form-urlencoded"}
+                headers={"Content-Type": "application/x-www-form-urlencoded"},
             )
-            
+
             if response.status_code != 200:
                 raise Exception(f"令牌交换失败 ({response.status_code}): {response.text}")
-            
+
             data = response.json()
-            
-            # 处理不同的响应格式
+
             if "access_token" in data:
                 return data
-            # 处理包装格式 {"code": 0, "data": {...}} 或 {"code": "000000", "data": {...}}
-            code = data.get("code")
-            if (code == 0 or code == "000000") and "data" in data:
+            code_val = data.get("code")
+            if (code_val == 0 or code_val == "000000") and "data" in data:
                 return data["data"]
             else:
                 raise Exception(f"无效的令牌响应格式: {data}")
-    
+
     @staticmethod
     async def get_user_info(access_token: str) -> Dict[str, Any]:
-        """
-        使用访问令牌获取用户信息
-        
-        Args:
-            access_token: OAuth 访问令牌
-            
-        Returns:
-            用户信息字典
-            
-        Raises:
-            Exception: 获取用户信息失败
-        """
-        userinfo_url = f"{settings.OAUTH_BASE_URL}{settings.OAUTH_USERINFO_ENDPOINT}"
-        
+        """使用访问令牌获取用户信息(不含角色)"""
+        userinfo_url = f"{settings.SSO_BASE_URL}{settings.SSO_USERINFO_ENDPOINT}"
+
         async with httpx.AsyncClient() as client:
             response = await client.get(
                 userinfo_url,
-                headers={"Authorization": f"Bearer {access_token}"}
+                headers={"Authorization": f"Bearer {access_token}"},
             )
-            
+
             if response.status_code != 200:
                 raise Exception(f"获取用户信息失败 ({response.status_code}): {response.text}")
-            
+
             data = response.json()
-            
-            # 处理不同的响应格式
+
             if "sub" in data or "id" in data:
                 return data
-            # 处理包装格式 {"code": 0, "data": {...}} 或 {"code": "000000", "data": {...}}
-            code = data.get("code")
-            if (code == 0 or code == "000000") and "data" in data:
+            code_val = data.get("code")
+            if (code_val == 0 or code_val == "000000") and "data" in data:
                 return data["data"]
             else:
                 raise Exception(f"无效的用户信息响应格式: {data}")
-    
+
+    @staticmethod
+    async def get_user_profile(access_token: str) -> Dict[str, Any]:
+        """
+        通过 SSO /oauth/userinfo 获取用户信息和角色。
+        返回格式: {sub, username, email, roles: [{name, code}]}
+        """
+        userinfo_url = f"{settings.SSO_BASE_URL}{settings.SSO_USERINFO_ENDPOINT}"
+        profile = {}
+
+        async with httpx.AsyncClient(timeout=10.0) as client:
+            try:
+                response = await client.get(
+                    userinfo_url,
+                    headers={"Authorization": f"Bearer {access_token}"},
+                )
+            except httpx.RequestError:
+                raise HTTPException(
+                    status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+                    detail="SSO 认证中心不可用",
+                )
+
+        if response.status_code == 401:
+            raise HTTPException(
+                status_code=status.HTTP_401_UNAUTHORIZED,
+                detail="无效的访问令牌",
+            )
+
+        if response.status_code == 200:
+            data = response.json()
+            logger.debug(f"SSO userinfo response: {data}")
+
+            code_val = data.get("code")
+            if (code_val == 0 or code_val == "000000") and "data" in data:
+                profile = data["data"]
+            elif "id" in data or "username" in data or "sub" in data:
+                profile = data
+
+        # 解析角色(支持对象和字符串两种格式)
+        raw_roles = profile.get("roles", [])
+        sso_roles: list = []
+        for role_item in raw_roles:
+            if isinstance(role_item, dict):
+                sso_roles.append(role_item.get("code", ""))
+                name = role_item.get("name", "")
+                if name:
+                    sso_roles.append(name)
+            elif isinstance(role_item, str):
+                sso_roles.append(role_item)
+
+        local_role = map_sso_roles_to_local(sso_roles)
+
+        logger.info(
+            f"SSO 用户 {profile.get('username')}: "
+            f"roles={raw_roles}, sso_roles={sso_roles} → local_role={local_role}"
+        )
+
+        return {
+            "id": profile.get("id") or profile.get("sub"),
+            "username": profile.get("username") or profile.get("preferred_username") or profile.get("name"),
+            "email": profile.get("email", ""),
+            "role": local_role,
+            "sso_roles": sso_roles,
+        }
+
     @staticmethod
     def sync_user_from_oauth(oauth_user_info: Dict[str, Any]) -> User:
         """
         从 OAuth 用户信息同步到本地数据库
         如果用户不存在则创建,如果存在则更新(包括角色)
-        
-        Args:
-            oauth_user_info: OAuth 返回的用户信息
-            
-        Returns:
-            本地用户对象
         """
         with get_db_connection() as conn:
             cursor = conn.cursor()
-            
-            # 提取用户信息(兼容不同的字段名)
+
             oauth_id = oauth_user_info.get("sub") or oauth_user_info.get("id")
             username = oauth_user_info.get("username") or oauth_user_info.get("preferred_username") or oauth_user_info.get("name")
             email = oauth_user_info.get("email", "")
-            
+
             if not oauth_id:
                 raise ValueError("OAuth 用户信息缺少 ID 字段")
-            
+
             if not username:
                 raise ValueError("OAuth 用户信息缺少用户名字段")
-            
-            # 计算本地角色
+
             sso_roles = oauth_user_info.get("sso_roles") or oauth_user_info.get("roles", [])
-            is_superuser = bool(oauth_user_info.get("is_superuser", False))
-            role = oauth_user_info.get("role") or map_sso_roles_to_local(sso_roles, is_superuser)
-            
+            role = oauth_user_info.get("role") or map_sso_roles_to_local(sso_roles)
+
+            if role is None:
+                raise ValueError(f"用户 {username} 没有被识别的 SSO 角色(sso_roles={sso_roles}),无权限访问")
+
             logger.debug(f"sync_user_from_oauth: oauth_id={oauth_id}, username={username}, sso_roles={sso_roles}, computed_role={role}")
-            
-            # 查找是否已存在该 OAuth 用户
+
             cursor.execute(
                 "SELECT * FROM users WHERE oauth_provider = %s AND oauth_id = %s",
-                ("sso", oauth_id)
+                ("sso", oauth_id),
             )
             row = cursor.fetchone()
-            
+
             if row:
-                # 用户已存在,更新信息(包括角色)
                 user = User.from_row(row)
                 logger.debug(f"User exists: id={user.id}, old_role={user.role}, new_role={role}")
-                
-                cursor.execute("""
-                    UPDATE users 
-                    SET username = %s, email = %s, role = %s
-                    WHERE id = %s
-                """, (username, email, role, user.id))
-                
+
+                cursor.execute(
+                    "UPDATE users SET username = %s, email = %s, role = %s WHERE id = %s",
+                    (username, email, role, user.id),
+                )
                 conn.commit()
                 logger.debug(f"User updated in database")
-                
-                # 重新查询更新后的用户
+
                 cursor.execute("SELECT * FROM users WHERE id = %s", (user.id,))
                 row = cursor.fetchone()
-                updated_user = User.from_row(row)
-                logger.debug(f"User after update: role={updated_user.role}")
-                return updated_user
+                return User.from_row(row)
             else:
-                # 新用户,创建记录
                 user_id = f"user_{datetime.now().strftime('%Y%m%d%H%M%S')}_{secrets.token_hex(4)}"
-                
-                cursor.execute("""
+
+                cursor.execute(
+                    """
                     INSERT INTO users (
                         id, username, email, password_hash, role,
                         oauth_provider, oauth_id, created_at
                     ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
-                """, (
-                    user_id,
-                    username,
-                    email,
-                    "",  # OAuth 用户不需要密码
-                    role,
-                    "sso",
-                    oauth_id,
-                    datetime.now()
-                ))
-                
+                    """,
+                    (user_id, username, email, "", role, "sso", oauth_id, datetime.now()),
+                )
                 conn.commit()
-                
-                # 查询新创建的用户
+
                 cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
                 row = cursor.fetchone()
                 return User.from_row(row)
-
-    @staticmethod
-    async def verify_sso_token(access_token: str) -> Dict[str, Any]:
-        """
-        通过 SSO 验证 token 并获取用户信息(含角色)。
-        
-        使用 /api/v1/system/users/profile 端点获取完整用户信息,
-        包括 roles 列表和 is_superuser 标记,然后映射为本地角色。
-        
-        Args:
-            access_token: SSO 访问令牌
-            
-        Returns:
-            用户信息字典 {id, username, email, role, ...}
-            
-        Raises:
-            HTTPException(401): token 无效
-            HTTPException(503): SSO 中心不可用
-        """
-        profile_url = f"{settings.OAUTH_BASE_URL}/api/v1/system/users/profile"
-        
-        async with httpx.AsyncClient(timeout=10.0) as client:
-            try:
-                response = await client.get(
-                    profile_url,
-                    headers={"Authorization": f"Bearer {access_token}"}
-                )
-            except httpx.RequestError:
-                raise HTTPException(
-                    status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
-                    detail="SSO 认证中心不可用"
-                )
-        
-        if response.status_code == 401:
-            raise HTTPException(
-                status_code=status.HTTP_401_UNAUTHORIZED,
-                detail="无效的访问令牌"
-            )
-        
-        if response.status_code != 200:
-            raise HTTPException(
-                status_code=status.HTTP_401_UNAUTHORIZED,
-                detail=f"SSO 验证失败 ({response.status_code})"
-            )
-        
-        data = response.json()
-        logger.debug(f"SSO profile response: {data}")
-        
-        # 处理包装格式 {"code": 0, "data": {...}} 或 {"code": "000000", "data": {...}}
-        code = data.get("code")
-        if (code == 0 or code == "000000") and "data" in data:
-            profile = data["data"]
-        elif "id" in data or "username" in data:
-            profile = data
-        else:
-            logger.error(f"Invalid profile response format: {data}")
-            raise HTTPException(
-                status_code=status.HTTP_401_UNAUTHORIZED,
-                detail="无效的访问令牌"
-            )
-        
-        # 提取角色信息并映射
-        sso_roles = profile.get("roles", [])
-        is_superuser = bool(profile.get("is_superuser", False))
-        local_role = map_sso_roles_to_local(sso_roles, is_superuser)
-        
-        logger.info(
-            f"SSO 用户 {profile.get('username')}: "
-            f"sso_roles={sso_roles}, is_superuser={is_superuser} → local_role={local_role}"
-        )
-        
-        # 返回统一格式的用户信息
-        return {
-            "id": profile.get("id"),
-            "username": profile.get("username"),
-            "email": profile.get("email", ""),
-            "role": local_role,
-            "sso_roles": sso_roles,
-            "is_superuser": is_superuser,
-        }
-
-    @staticmethod
-    async def refresh_sso_token(refresh_token: str) -> Dict[str, Any]:
-        """
-        向 SSO 中心刷新 token。
-        
-        Args:
-            refresh_token: SSO 刷新令牌
-            
-        Returns:
-            新的 token 信息 {access_token, refresh_token, ...}
-            
-        Raises:
-            HTTPException(401): refresh_token 无效
-            HTTPException(503): SSO 中心不可用
-        """
-        token_url = f"{settings.OAUTH_BASE_URL}{settings.OAUTH_TOKEN_ENDPOINT}"
-        logger.debug(f"Refreshing token at: {token_url}")
-        
-        async with httpx.AsyncClient(timeout=10.0) as client:
-            try:
-                response = await client.post(
-                    token_url,
-                    data={
-                        "grant_type": "refresh_token",
-                        "refresh_token": refresh_token,
-                        "client_id": settings.OAUTH_CLIENT_ID,
-                        "client_secret": settings.OAUTH_CLIENT_SECRET
-                    },
-                    headers={"Content-Type": "application/x-www-form-urlencoded"}
-                )
-            except httpx.RequestError as e:
-                logger.error(f"SSO refresh request failed: {e}")
-                raise HTTPException(
-                    status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
-                    detail="SSO 认证中心不可用"
-                )
-        
-        logger.debug(f"SSO refresh response: status={response.status_code}")
-        
-        if response.status_code != 200:
-            logger.error(f"SSO refresh failed: {response.status_code}, body={response.text}")
-            raise HTTPException(
-                status_code=status.HTTP_401_UNAUTHORIZED,
-                detail="刷新令牌无效或已过期,请重新登录"
-            )
-        
-        data = response.json()
-        logger.debug(f"SSO refresh response data: {data}")
-        
-        # 处理包装格式 {"code": 0, "data": {...}} 或 {"code": "000000", "data": {...}}
-        code = data.get("code")
-        if (code == 0 or code == "000000") and "data" in data:
-            return data["data"]
-        elif "access_token" in data:
-            return data
-        else:
-            logger.error(f"Invalid refresh response format: {data}")
-            raise HTTPException(
-                status_code=status.HTTP_401_UNAUTHORIZED,
-                detail="刷新令牌无效或已过期,请重新登录"
-            )

+ 0 - 84
backend/services/token_cache_service.py

@@ -1,84 +0,0 @@
-"""
-SSO Token 内存缓存服务
-缓存 token → 用户信息映射,减少对 SSO 中心的请求压力。
-支持可配置的 TTL 过期策略。
-"""
-import time
-import threading
-from dataclasses import dataclass, field
-from typing import Dict, Any, Optional
-
-
-@dataclass
-class CacheEntry:
-    """缓存条目,包含用户信息和创建时间"""
-    user_info: Dict[str, Any]
-    created_at: float = field(default_factory=time.time)
-
-    def is_expired(self, ttl: float) -> bool:
-        """检查缓存条目是否已过期"""
-        return (time.time() - self.created_at) > ttl
-
-
-class TokenCacheService:
-    """
-    SSO Token 内存缓存服务
-
-    通过缓存 token → 用户信息的映射,避免每次 API 请求都调用 SSO 中心验证。
-    缓存条目在 TTL 过期后自动失效,下次访问时会被清除。
-    """
-
-    def __init__(self, ttl_seconds: int = 300):
-        """
-        Args:
-            ttl_seconds: 缓存过期时间(秒),默认 300 秒(5 分钟)
-        """
-        self._cache: Dict[str, CacheEntry] = {}
-        self._ttl = ttl_seconds
-        self._lock = threading.Lock()
-
-    def get(self, token: str) -> Optional[Dict[str, Any]]:
-        """
-        查询缓存,返回用户信息或 None。
-        如果缓存条目已过期,自动删除并返回 None。
-
-        Args:
-            token: SSO access_token
-
-        Returns:
-            用户信息字典,或 None(未命中/已过期)
-        """
-        with self._lock:
-            entry = self._cache.get(token)
-            if entry is None:
-                return None
-            if entry.is_expired(self._ttl):
-                del self._cache[token]
-                return None
-            return entry.user_info
-
-    def set(self, token: str, user_info: Dict[str, Any]) -> None:
-        """
-        写入缓存。
-
-        Args:
-            token: SSO access_token
-            user_info: 用户信息字典
-        """
-        with self._lock:
-            self._cache[token] = CacheEntry(user_info=user_info)
-
-    def invalidate(self, token: str) -> None:
-        """
-        使指定 token 的缓存失效。
-
-        Args:
-            token: SSO access_token
-        """
-        with self._lock:
-            self._cache.pop(token, None)
-
-    def clear(self) -> None:
-        """清空所有缓存"""
-        with self._lock:
-            self._cache.clear()

+ 11 - 9
backend/test/auth_test_helper.py

@@ -1,22 +1,24 @@
 """
-Test helper for SSO-based authentication.
-Provides utilities to create test tokens by injecting them directly
-into the token cache, bypassing SSO center verification.
+Test helper for JWT-based authentication.
+Provides utilities to create test JWT tokens.
 """
 import uuid
-from middleware.auth_middleware import token_cache
+from services import jwt_service
 
 
 def create_test_token(user_data: dict) -> str:
     """
-    Create a fake SSO token for testing by injecting it into the token cache.
+    Create a test JWT token for testing.
 
     Args:
         user_data: Dict with id, username, email, role
 
     Returns:
-        A fake token string that will be recognized by the auth middleware.
+        A valid JWT token string.
     """
-    fake_token = f"test_sso_token_{uuid.uuid4().hex}"
-    token_cache.set(fake_token, user_data)
-    return fake_token
+    return jwt_service.create_access_token(
+        user_id=user_data.get("id", f"test_user_{uuid.uuid4().hex}"),
+        username=user_data.get("username", "testuser"),
+        email=user_data.get("email", "test@example.com"),
+        role=user_data.get("role", "viewer"),
+    )

+ 30 - 31
backend/test_oauth_config.py

@@ -1,53 +1,52 @@
 """
-测试 OAuth 配置
-直接访问 OAuth 服务器验证配置
+测试 SSO 配置
+直接访问统一认证平台验证配置
 """
 import requests
 from config import settings
 
-def test_oauth_config():
-    """测试 OAuth 配置"""
+
+def test_sso_config():
+    """测试 SSO 配置"""
     print("=" * 60)
-    print("OAuth 配置测试")
+    print("SSO 配置测试")
     print("=" * 60)
-    
-    print(f"\n1. OAuth 配置:")
-    print(f"   Base URL: {settings.OAUTH_BASE_URL}")
-    print(f"   Client ID: {settings.OAUTH_CLIENT_ID}")
-    print(f"   Client Secret: {settings.OAUTH_CLIENT_SECRET}")
-    print(f"   Redirect URI: {settings.OAUTH_REDIRECT_URI}")
-    print(f"   Authorize Endpoint: {settings.OAUTH_AUTHORIZE_ENDPOINT}")
-    
+
+    print(f"\n1. SSO 配置:")
+    print(f"   Base URL: {settings.SSO_BASE_URL}")
+    print(f"   Client ID: {settings.SSO_CLIENT_ID}")
+    print(f"   Client Secret: {settings.SSO_CLIENT_SECRET}")
+    print(f"   Redirect URI: {settings.SSO_REDIRECT_URI}")
+    print(f"   Authorize Endpoint: {settings.SSO_AUTHORIZE_ENDPOINT}")
+
     # 构建授权 URL
     from urllib.parse import urlencode
+
     params = {
         "response_type": "code",
-        "client_id": settings.OAUTH_CLIENT_ID,
-        "redirect_uri": settings.OAUTH_REDIRECT_URI,
-        "scope": settings.OAUTH_SCOPE,
-        "state": "test_state_123"
+        "client_id": settings.SSO_CLIENT_ID,
+        "redirect_uri": settings.SSO_REDIRECT_URI,
+        "scope": settings.SSO_SCOPE,
+        "state": "test_state_123",
     }
-    
-    authorize_url = f"{settings.OAUTH_BASE_URL}{settings.OAUTH_AUTHORIZE_ENDPOINT}"
+
+    authorize_url = f"{settings.SSO_BASE_URL}{settings.SSO_AUTHORIZE_ENDPOINT}"
     full_url = f"{authorize_url}?{urlencode(params)}"
-    
+
     print(f"\n2. 授权 URL:")
     print(f"   {full_url}")
-    
-    print(f"\n3. 测试 OAuth 服务器连接...")
+
+    print(f"\n3. 测试 SSO 服务器连接...")
     try:
-        response = requests.get(settings.OAUTH_BASE_URL, timeout=5)
-        print(f"   ✓ OAuth 服务器可访问 (状态码: {response.status_code})")
+        response = requests.get(settings.SSO_BASE_URL, timeout=5)
+        print(f"   ✓ SSO 服务器可访问 (状态码: {response.status_code})")
     except Exception as e:
-        print(f"   ✗ OAuth 服务器不可访问: {e}")
-    
+        print(f"   ✗ SSO 服务器不可访问: {e}")
+
     print("\n" + "=" * 60)
     print("请手动访问上面的授权 URL 测试登录流程")
-    print("如果返回 'invalid_client' 错误,请检查:")
-    print("1. Client ID 是否在 OAuth 服务器上注册")
-    print("2. Redirect URI 是否在 OAuth 服务器上配置")
-    print("3. Client Secret 是否正确")
     print("=" * 60)
 
+
 if __name__ == "__main__":
-    test_oauth_config()
+    test_sso_config()

+ 33 - 39
web/apps/lq_label/src/components/oauth-callback/oauth-callback.tsx

@@ -1,14 +1,15 @@
 /**
  * OAuth callback component
- * Handles OAuth redirect and completes authentication
+ * Handles SSO redirect. SSO redirects to this page with code in URL params.
+ * Frontend calls backend /api/oauth/exchange-code to get JWT tokens.
  */
 import React, { useEffect, useState, useRef } from 'react';
 import { useNavigate, useSearchParams } from 'react-router-dom';
 import { useAtom } from 'jotai';
-import { handleOAuthCallback } from '../../services/oauth-service';
 import { loginAtom } from '../../atoms/auth-atoms';
 import { toast } from '../../services/toast';
 import styles from './oauth-callback.module.scss';
+import { apiClient } from '../../services/api';
 
 export const OAuthCallback: React.FC = () => {
   const [searchParams] = useSearchParams();
@@ -20,18 +21,18 @@ export const OAuthCallback: React.FC = () => {
   useEffect(() => {
     // Prevent duplicate processing in React StrictMode
     if (isProcessingRef.current) {
-      console.log('Already processing, skipping duplicate call');
+      console.log('Already processing, skip duplicate call');
       return;
     }
 
     const processCallback = async () => {
       isProcessingRef.current = true;
       console.log('Starting OAuth callback processing...');
-      
-      // 清除任何旧的认证数据,避免干扰 OAuth 流程
+
+      // Clear any stale auth data
       localStorage.removeItem('auth_tokens');
       localStorage.removeItem('current_user');
-      
+
       try {
         // Log all URL parameters for debugging
         const allParams: Record<string, string> = {};
@@ -41,59 +42,52 @@ export const OAuthCallback: React.FC = () => {
         console.log('OAuth callback URL parameters:', allParams);
 
         // Check for error from OAuth provider
-        const error = searchParams.get('error');
+        const oauthError = searchParams.get('error');
         const errorDescription = searchParams.get('error_description');
-        
-        if (error) {
-          throw new Error(`OAuth 错误: ${error} - ${errorDescription || '未知错误'}`);
+
+        if (oauthError) {
+          throw new Error(`OAuth 错误: ${oauthError} - ${errorDescription || '未知错误'}`);
         }
 
-        // Get code and state from URL
+        // Get code from URL (SSO redirected here with code)
         const code = searchParams.get('code');
-        const state = searchParams.get('state');
-
-        console.log('Code:', code);
-        console.log('State:', state);
 
-        if (!code || !state) {
-          console.error('Missing parameters. All params:', allParams);
-          throw new Error('缺少授权码或 state 参数');
+        if (!code) {
+          console.error('Missing code parameter. All params:', allParams);
+          throw new Error('缺少授权码参数');
         }
 
-        // Verify state matches what we saved
-        const savedState = sessionStorage.getItem('oauth_state');
-        console.log('Saved state:', savedState);
-        
-        if (state !== savedState) {
-          throw new Error('State 参数不匹配,可能存在 CSRF 攻击');
-        }
-
-        // Exchange code for tokens
         console.log('Exchanging code for tokens...');
-        const response = await handleOAuthCallback({ code, state });
-        console.log('Token exchange successful');
 
-        // Clear saved state
-        sessionStorage.removeItem('oauth_state');
+        // Call backend to exchange code for JWT tokens
+        const response = await apiClient.post('/api/oauth/exchange-code', {
+          code,
+        });
+
+        const { token, refresh_token, user } = response.data;
+        console.log('Token exchange successful, user:', user);
 
-        // Update auth state
+        // Save tokens and user to localStorage via Jotai
         setAuth({
           tokens: {
-            access_token: response.access_token,
-            refresh_token: response.refresh_token,
-            token_type: response.token_type,
+            access_token: token,
+            refresh_token: refresh_token,
+            token_type: 'bearer',
           },
           user: {
-            ...response.user,
-            role: response.user.role as 'annotator' | 'admin' | 'viewer',
+            id: user.id,
+            username: user.username,
+            email: user.email,
+            role: user.role as 'annotator' | 'admin' | 'viewer',
+            created_at: user.created_at,
           },
         });
 
         // Show success message
-        toast.success(`欢迎,${response.user.username}!`);
+        toast.success(`欢迎,${user.username}!`);
 
         // Redirect to home page
-        navigate('/');
+        navigate('/', { replace: true });
       } catch (err: any) {
         console.error('OAuth callback error:', err);
         setError(err.message || 'OAuth 登录失败');

+ 7 - 9
web/apps/lq_label/src/components/user-menu/user-menu.tsx

@@ -3,17 +3,15 @@
  * Displays current user info and logout button
  */
 import React, { useState, useRef, useEffect } from 'react';
-import { useAtom, useAtomValue } from 'jotai';
-import { useNavigate } from 'react-router-dom';
-import { currentUserAtom, logoutAtom, isAdminAtom } from '../../atoms/auth-atoms';
+import { useAtomValue } from 'jotai';
+import { currentUserAtom, isAdminAtom } from '../../atoms/auth-atoms';
+import { logout as logoutService } from '../../services/oauth-service';
 import { toast } from '../../services/toast';
 import styles from './user-menu.module.scss';
 
 export const UserMenu: React.FC = () => {
   const user = useAtomValue(currentUserAtom);
   const isAdmin = useAtomValue(isAdminAtom);
-  const [, logout] = useAtom(logoutAtom);
-  const navigate = useNavigate();
 
   const [isOpen, setIsOpen] = useState(false);
   const menuRef = useRef<HTMLDivElement>(null);
@@ -35,10 +33,10 @@ export const UserMenu: React.FC = () => {
     };
   }, [isOpen]);
 
-  const handleLogout = () => {
-    logout();
-    toast.success('已退出登录');
-    navigate('/login');
+  const handleLogout = async () => {
+    toast.success('正在退出...');
+    await logoutService();
+    // logoutService handles the redirect, so we don't need to navigate
   };
 
   if (!user) {

+ 2 - 2
web/apps/lq_label/src/services/api.ts

@@ -89,7 +89,7 @@ apiClient.interceptors.request.use(
     // Skip token attachment for OAuth public endpoints
     if (
       config.url?.includes('/api/oauth/login') ||
-      config.url?.includes('/api/oauth/callback') ||
+      config.url?.includes('/api/oauth/exchange-code') ||
       config.url?.includes('/api/oauth/refresh')
     ) {
       return config;
@@ -188,7 +188,7 @@ apiClient.interceptors.response.use(
           );
 
           const newTokens = {
-            access_token: response.data.access_token,
+            access_token: response.data.token || response.data.access_token,
             refresh_token: response.data.refresh_token || tokens.refresh_token,
             token_type: response.data.token_type,
           };

+ 24 - 7
web/apps/lq_label/src/services/auth-service.ts

@@ -8,10 +8,8 @@ import { apiClient } from './api';
 import type { User } from '../atoms/auth-atoms';
 
 /**
- * Get current user information from SSO-verified session.
- * Requires valid SSO access token in Authorization header.
- *
- * @returns Current user information
+ * Get current user information from JWT-verified session.
+ * Requires valid JWT token in Authorization header.
  */
 export async function getCurrentUser(): Promise<User> {
   const response = await apiClient.get<User>('/api/oauth/me');
@@ -19,10 +17,29 @@ export async function getCurrentUser(): Promise<User> {
 }
 
 /**
- * Logout (client-side only)
- * Clears tokens from storage
+ * Logout - notify backend (SSO revoke) and clear local state,
+ * then redirect to SSO login page.
  */
-export function logout(): void {
+export async function logout(): Promise<void> {
+  let logoutUrl: string | undefined;
+  const tokens = localStorage.getItem('auth_tokens');
+  if (tokens) {
+    try {
+      const { access_token } = JSON.parse(tokens);
+      const resp = await apiClient.post('/api/oauth/logout', null, {
+        headers: { Authorization: `Bearer ${access_token}` },
+      });
+      logoutUrl = resp.data.logout_url;
+    } catch {
+      // Non-critical, proceed with local cleanup
+    }
+  }
   localStorage.removeItem('auth_tokens');
   localStorage.removeItem('current_user');
+
+  if (logoutUrl) {
+    window.location.href = logoutUrl;
+  } else {
+    window.location.href = '/login';
+  }
 }

+ 31 - 47
web/apps/lq_label/src/services/oauth-service.ts

@@ -1,6 +1,7 @@
 /**
  * OAuth 2.0 authentication service
- * Handles OAuth login flow with SSO provider
+ * Handles OAuth login flow with SSO provider.
+ * The callback is handled via backend 302 redirect, not via API call.
  */
 import { apiClient } from './api';
 
@@ -12,30 +13,6 @@ export interface OAuthLoginResponse {
   state: string;
 }
 
-/**
- * OAuth callback parameters
- */
-export interface OAuthCallbackParams {
-  code: string;
-  state: string;
-}
-
-/**
- * OAuth token response
- */
-export interface OAuthTokenResponse {
-  access_token: string;
-  refresh_token: string;
-  token_type: string;
-  user: {
-    id: string;
-    username: string;
-    email: string;
-    role: string;
-    created_at: string;
-  };
-}
-
 /**
  * OAuth status response
  */
@@ -54,32 +31,11 @@ export async function initiateOAuthLogin(): Promise<OAuthLoginResponse> {
   return response.data;
 }
 
-/**
- * Handle OAuth callback
- * Exchange authorization code for tokens
- */
-export async function handleOAuthCallback(
-  params: OAuthCallbackParams
-): Promise<OAuthTokenResponse> {
-  const response = await apiClient.get<OAuthTokenResponse>(
-    '/api/oauth/callback',
-    {
-      params: {
-        code: params.code,
-        state: params.state,
-      },
-    }
-  );
-  return response.data;
-}
-
 /**
  * Get OAuth configuration status
  */
 export async function getOAuthStatus(): Promise<OAuthStatusResponse> {
-  const response = await apiClient.get<OAuthStatusResponse>(
-    '/api/oauth/status'
-  );
+  const response = await apiClient.get<OAuthStatusResponse>('/api/oauth/status');
   return response.data;
 }
 
@@ -96,3 +52,31 @@ export async function startOAuthLogin(): Promise<void> {
   // Redirect to OAuth provider
   window.location.href = authorization_url;
 }
+
+/**
+ * Logout - notify backend and clear local state, then redirect to SSO login page.
+ */
+export async function logout(): Promise<void> {
+  let logoutUrl: string | undefined;
+  const tokens = localStorage.getItem('auth_tokens');
+  if (tokens) {
+    try {
+      const { access_token } = JSON.parse(tokens);
+      const resp = await apiClient.post('/api/oauth/logout', null, {
+        headers: { Authorization: `Bearer ${access_token}` },
+      });
+      logoutUrl = resp.data.logout_url;
+    } catch {
+      // Non-critical, proceed with local cleanup
+    }
+  }
+  localStorage.removeItem('auth_tokens');
+  localStorage.removeItem('current_user');
+
+  // Redirect to SSO login page
+  if (logoutUrl) {
+    window.location.href = logoutUrl;
+  } else {
+    window.location.href = '/login';
+  }
+}

+ 185 - 0
文档/标注平台需求.md

@@ -0,0 +1,185 @@
+
+
+### 标注平台需求
+  - 功能定义: 定义标注平台  LabelingSystem/backend(后端) 、LabelingSystem/web(前端)  
+  - 应用名称:四川路桥标注平台
+  - config.dev.yaml 统一认证平台已经有配置信息(oauth.base_url 、 oauth.client_id \ oauth.client_secret \ oauth.redirect_uri )
+  - 将现有(config.dev.yaml、config.test.yaml、config.prod.yaml)统一认证平台配置调整为:样本中心接入单点登录 ,SSO_BASE_URL、 CLIENT_ID、CLIENT_SECRET、REDIRECT_URI 配置项 、 FRONTEND_URL
+    - SSO_BASE_URL(统一认证平台URL地址):http://192.168.92.61:8200
+    - CLIENT_ID(应用key App Key): nlKLQJdJK3f5ub7UDfQ_E71z2Lo3YSQx
+    - CLIENT_SECRET(应用密钥App Secret): wh0HU_9T83rYMjfLFToNxFOKcrk_8H7Ba_27nNGlPqtTf9ROCytsOgp2ue0ol5mm
+    - REDIRECT_URI(回调地址):http://localhost:8003/auth/callback
+    - FRONTEND_URL(前端首页访问地址):http://localhost:4200
+    - SCOPE: "profile email"
+    - SSO_LOGOUT_REDIRECT_URL:http://localhost:3000/login
+    - 获取授权码:http://192.168.92.61:8200/oauth/authorize?client_id=nlKLQJdJK3f5ub7UDfQ_E71z2Lo3YSQx&redirect_uri=http://localhost:8003/auth/callback&response_type=code&scope=read%20write
+    - 获取Access Token:http://192.168.92.61:8200/oauth/token?client_id=nlKLQJdJK3f5ub7UDfQ_E71z2Lo3YSQx&client_secret=wh0HU_9T83rYMjfLFToNxFOKcrk_8H7Ba_27nNGlPqtTf9ROCytsOgp2ue0ol5mm&grant_type=authorization_code&
+    - 获取用户信息:http://192.168.92.61:8200/oauth/userinfo
+  
+
+
+
+### 标注平台角色定义
+    - 角色定义
+      - label_admin 定义标注管理员
+      - annotator 定义标注员
+      - viewer 定义查看者
+    - 角色获取 统一通过统一认证平台获取 /oauth/userinfo   roles: [{ "name": "标注员", "code": "labeler" }]
+    - 只有统一认证平台返回的角色被识别到了才有权限(其中label_admin 、annotator),否则没有权限
+ 
+    - 权限区别
+      - admin 独有操作:
+        - 删除项目 / 删除任务
+        - 修改项目状态 / 修改项目配置
+        - 任务分配 / 批量分配 / 预览分配
+        - 用户管理(列表/详情/统计)
+        - 平台总览统计
+        - 创建导出任务
+        - 外部项目操作
+
+      - annotator / viewer 共同能力:
+        - 查看所有项目列表(非 admin 只能看到有自己分配任务的项目)
+        - 创建/查看/编辑自己的标注
+        - 查看自己任务下所有标注
+        - 下载已导出的数据(但不能创建导出)
+        - 查看模板和 XML 校验
+
+      - annotator 独有(viewer 没有):
+        - 出现在任务分配的候选人列表中
+        - 出现在项目统计的人员统计中
+
+        关键区别:annotator 和 viewer 在 API 层面的权限几乎一样,区别在于 annotator 能被分配到任务,viewer 不能。
+
+        
+        ┌──────────────┬──────────────┬───────────┬───────────────────────────────────────────────────────────────────────┐
+        │ SSO 角色代码 │ SSO 角色名称 │ 本地角色  │                                 权限                                  │
+        ├──────────────┼──────────────┼───────────┼───────────────────────────────────────────────────────────────────────┤
+        │ label_admin  │ 标注管理员   │ admin     │ 管理员(CRUD、任务分配、用户管理等)                                  │
+        ├──────────────┼──────────────┼───────────┼───────────────────────────────────────────────────────────────────────┤
+        │ annotator    │ 标注员       │ annotator │ 标注员(查看自己项目、创建/编辑标注)                                 │
+        ├──────────────┼──────────────┼───────────┼───────────────────────────────────────────────────────────────────────┤
+        │ viewer       │ 查看者       │ viewer    │ 查看者(查看分配给自己的项目、查看标注、下载数据,不能创建/编辑标注) │
+        ├──────────────┼──────────────┼───────────┼───────────────────────────────────────────────────────────────────────┤
+        │ 其他         │ 其他         │ 无权限    │ 拒绝访问                                                              │
+        └──────────────┴──────────────┴───────────┴───────────────────────────────────────────────────────────────────────┘
+
+        viewer 权限说明:和 annotator 基本一样但区别在于业务语义上查看者不应创建/编辑标注。如果需要严格限制 viewer 不能创建标注,需要在 annotation 相关端点增加   
+        role != "viewer" 检查。是否需要我加上这个限制?
+
+### 统一认证平台
+  - AI中台管理:统一认证平台(8200 )后端:LQAI-middle-platform, 前端:LQAI-middle-platform-front
+  - 样本中心: 子应用  后端:LQAdminPlatform  前端:LQAdminFront
+  - 要求: 统一认证平台(统一认证平台:8200)已经登录了,点击子应用图标(样本中心:8000) 直接跳转子应用首页 无需再次登录
+  
+
+
+  ● 完成。新的 SSO 免登流程:
+
+  统一认证平台 → 点击子应用图标
+    ↓
+  302 跳转到 http://localhost:4200/auth/callback?code=xxx (前端)
+    ↓
+  前端页面显示 "正在登录..." loading 效果
+    ↓
+  前端 POST /api/oauth/exchange-code { code: "xxx" }
+    ↓
+  后端:code 换 SSO token → 获取用户信息+角色 → 同步本地DB → 签发本地 JWT
+    ↓
+  返回 { token, refresh_token, user }
+    ↓
+  前端保存 token 到 localStorage → 跳转到首页 /
+
+
+
+
+  ● 方案B实现完成
+  我修改了4个文件,下面是完整的修改总结:
+
+  ---
+  一、修改的文件
+
+  ┌────────────────┬────────────────────────────────────────────────────┬───────────────────────────┐    
+  │      项目      │                      文件路径                      │         修改内容          │    
+  ├────────────────┼────────────────────────────────────────────────────┼───────────────────────────┤    
+  │ 统一认证平台后 │ LQAI-middle-platform/src/app/schemas/auth.py       │ 新增 SSORedirectRequest   │    
+  │ 端             │                                                    │ Schema                    │    
+  ├────────────────┼────────────────────────────────────────────────────┼───────────────────────────┤    
+  │ 统一认证平台后 │                                                    │ 新增 POST                 │    
+  │ 端             │ LQAI-middle-platform/src/views/auth_view.py        │ /api/v1/auth/sso-redirect │    
+  │                │                                                    │  接口                     │    
+  ├────────────────┼────────────────────────────────────────────────────┼───────────────────────────┤    
+  │ 统一认证平台前 │                                                    │ 新增                      │    
+  │ 端             │ LQAI-middle-platform-front/src/api/auth.ts         │ getSSORedirectUrl(appId)  │    
+  │                │                                                    │ API 方法                  │    
+  ├────────────────┼────────────────────────────────────────────────────┼───────────────────────────┤    
+  │                │                                                    │ 修改                      │    
+  │ 统一认证平台前 │ LQAI-middle-platform-front/src/views/dashboard/Ind │ handleAppClick:先请求    │    
+  │ 端             │ ex.vue                                             │ SSO 免登接口,失败则降级  │    
+  │                │                                                    │ 直接打开                  │    
+  └────────────────┴────────────────────────────────────────────────────┴───────────────────────────┘    
+
+  ---
+  二、统一认证平台 SSO 免登流程(点击子应用后)
+
+  1. 统一认证平台前端点击"标注平台"
+         │
+         ▼
+  2. 调用 POST /api/v1/auth/sso-redirect {app_id: "样本中心ID"}
+     (携带当前登录用户的 Bearer Token)
+         │
+         ▼
+  3. 统一认证平台后端:
+     ├── 验证用户已登录(JWT + Redis 校验)
+     ├── 查询 t_sys_app 获取样本中心的 app_key、redirect_uris
+     ├── 生成 OAuth2 授权码(一次性,10分钟有效)
+     ├── 存入 Redis
+     └── 返回 {redirect_url: "http://localhost:8000/auth/callback?code=xxx"}
+         │
+         ▼
+  4. 前端 window.open(redirect_url, '_blank')
+         │
+         ▼
+  5. 标注平台后端 /auth/callback?code=xxx
+     ├── 用 code 向统一认证平台换取 access_token
+     ├── 用 access_token 获取用户信息
+     ├── 在本地查找/创建用户
+     ├── 生成本地 JWT token
+     └── 重定向到前端 /oauth/callback?token=xxx&refresh_token=xxx
+         │
+         ▼
+  6. 标注平台前端 OAuthCallback.vue
+     ├── 从 URL 提取 token 保存到本地
+     ├── 获取用户信息
+     └── 跳转到(首页)
+
+  整个流程用户无需输入密码,无感知完成登录。
+
+
+
+
+### 统一认证平台 SSO 免登 优化
+    - 统一认证平台 点击子应用图标 请求子应用端访问授权码接口优化
+      - 直接请求应用端后台 可能会出现网络问题,导致页面白屏效果
+      - 现在调整为请求前端地址,前端页面可以增加处理中效果,提升用户体验
+      - 把应用端接收授权码配置调整为前端应用 {redirect_url: "http://localhost:4200/auth/callback?code=xxx"}
+      - 应用前端拿到授权码后,再请求应用后端接口把授权码提交给统一认证平台获取token
+      - 成果获取token后,就可以获取用户信息了
+      - 最后再调整到应用首页
+      - 如果过程中处理异常,则直接在应用前端页面提示错误信息
+ 
+
+        ● 完成。新的 SSO 免登流程:
+
+        统一认证平台 → 点击子应用图标
+            ↓
+        302 跳转到 http://localhost:4200/auth/callback?code=xxx (前端)
+            ↓
+        前端页面显示 "正在登录..." loading 效果
+            ↓
+        前端 POST /api/oauth/exchange-code { code: "xxx" }
+            ↓
+        后端:code 换 SSO token → 获取用户信息+角色 → 同步本地DB → 签发本地 JWT
+            ↓
+        返回 { token, refresh_token, user }
+            ↓
+        前端保存 token 到 localStorage → 跳转到首页 /