Forráskód Böngészése

feat: 实现标注平台 Open API 接口:1新增 API Key + Secret 签名认证接口,2新增项目列表/详情查询接口。3新增数据集下载接口 。新增 api_applications 表

xiaoyanzhen 1 hete
szülő
commit
4d85a97398

+ 18 - 0
backend/database.py

@@ -244,6 +244,24 @@ def init_database() -> None:
             ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
         """)
         
+        # Create api_applications table
+        cursor.execute("""
+            CREATE TABLE IF NOT EXISTS api_applications (
+                id              VARCHAR(36) PRIMARY KEY,
+                app_id          VARCHAR(64)  NOT NULL UNIQUE,
+                app_name        VARCHAR(128) NOT NULL COMMENT '应用名称',
+                app_secret      VARCHAR(128) NOT NULL COMMENT '应用密钥(bcrypt 哈希存储)',
+                status          VARCHAR(20)  NOT NULL DEFAULT 'active' COMMENT '状态: active/disabled',
+                description     TEXT         DEFAULT NULL COMMENT '应用描述',
+                created_by      VARCHAR(36)  DEFAULT NULL COMMENT '创建人用户ID',
+                created_at      TIMESTAMP    DEFAULT CURRENT_TIMESTAMP,
+                updated_at      TIMESTAMP    DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+                last_used_at    TIMESTAMP    NULL COMMENT '最后一次使用时间',
+                INDEX idx_app_id (app_id),
+                INDEX idx_status (status)
+            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='API 接入应用管理'
+        """)
+
         logger.info("MySQL 数据库初始化完成")
 
 

+ 5 - 0
backend/main.py

@@ -8,6 +8,8 @@ from fastapi.middleware.cors import CORSMiddleware
 from contextlib import asynccontextmanager
 from database import init_database
 from routers import project, task, annotation, oauth, user, template, statistics, export, external
+from routers.api.v1 import open_auth_view
+from routers.api.v1.open_project_view import router as open_project_router, download_router as open_download_router
 from middleware.auth_middleware import AuthMiddleware
 
 # 配置日志
@@ -63,6 +65,9 @@ app.include_router(template.router)
 app.include_router(statistics.router)
 app.include_router(export.router)
 app.include_router(external.router)
+app.include_router(open_auth_view.router)
+app.include_router(open_project_router)
+app.include_router(open_download_router)
 
 
 @app.get("/")

+ 6 - 0
backend/middleware/auth_middleware.py

@@ -68,11 +68,17 @@ class AuthMiddleware(BaseHTTPMiddleware):
         "/api/oauth/logout",
     }
 
+    OPEN_API_PREFIX = "/api/v1/open/"
+
     async def dispatch(self, request: Request, call_next):
         # Skip authentication for public paths
         if request.url.path in self.PUBLIC_PATHS:
             return await call_next(request)
 
+        # Skip authentication for Open API routes (they use their own Bearer token check)
+        if request.url.path.startswith(self.OPEN_API_PREFIX):
+            return await call_next(request)
+
         # Skip authentication for OPTIONS requests (CORS preflight)
         if request.method == "OPTIONS":
             return await call_next(request)

+ 0 - 0
backend/routers/api/__init__.py


+ 0 - 0
backend/routers/api/v1/__init__.py


+ 74 - 0
backend/routers/api/v1/open_auth_view.py

@@ -0,0 +1,74 @@
+"""
+Open API authentication router.
+Provides token acquisition via API Key + Secret HMAC-SHA256 signature.
+"""
+from fastapi import APIRouter, Header, HTTPException, status
+from database import get_db_connection
+from schemas.open_auth import TokenResponse, TokenResponseData
+from services.api.open_auth_service import (
+    verify_signature,
+    check_and_store_nonce,
+    create_open_api_token,
+    validate_timestamp,
+    get_application_by_app_id,
+    update_last_used,
+)
+import logging
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(
+    prefix="/api/v1/open/auth",
+    tags=["open-api-auth"],
+)
+
+
+@router.post("/token", response_model=TokenResponse)
+async def get_access_token(
+    x_api_key: str = Header(..., alias="X-Api-Key"),
+    x_signature: str = Header(..., alias="X-Signature"),
+    x_timestamp: str = Header(..., alias="X-Timestamp"),
+    x_nonce: str = Header(..., alias="X-Nonce"),
+):
+    """获取 Access Token — 通过 API Key + Secret 签名认证"""
+    with get_db_connection() as conn:
+        app = get_application_by_app_id(conn, x_api_key)
+        if not app:
+            raise HTTPException(
+                status_code=status.HTTP_401_UNAUTHORIZED,
+                detail={"error_code": "INVALID_API_KEY", "message": "app_id 不存在"},
+            )
+        if app["status"] != "active":
+            raise HTTPException(
+                status_code=status.HTTP_401_UNAUTHORIZED,
+                detail={"error_code": "APP_DISABLED", "message": "应用已被禁用"},
+            )
+
+        validate_timestamp(x_timestamp)
+
+        if check_and_store_nonce(x_nonce):
+            raise HTTPException(
+                status_code=status.HTTP_401_UNAUTHORIZED,
+                detail={"error_code": "NONCE_USED", "message": "Nonce 已被使用(重放攻击)"},
+            )
+
+        if not verify_signature(x_api_key, x_timestamp, x_nonce, x_signature, app["app_secret"]):
+            raise HTTPException(
+                status_code=status.HTTP_401_UNAUTHORIZED,
+                detail={"error_code": "INVALID_SIGNATURE", "message": "签名验证失败"},
+            )
+
+        token = create_open_api_token(app["app_id"], app["app_name"])
+        update_last_used(conn, app["app_id"])
+
+    logger.info(f"Open API token issued for app_id={x_api_key}")
+
+    return TokenResponse(
+        code=0,
+        message="success",
+        data=TokenResponseData(
+            access_token=token,
+            token_type="Bearer",
+            expires_in=7200,
+        ),
+    )

+ 242 - 0
backend/routers/api/v1/open_project_view.py

@@ -0,0 +1,242 @@
+import os
+import logging
+from datetime import datetime, timezone, timedelta
+from typing import Optional
+from fastapi import APIRouter, Query, Depends, HTTPException, status
+from fastapi.responses import FileResponse
+from schemas.open_project import (
+    OpenProjectListResponse, OpenProjectListData, OpenProjectDetailResponse,
+    OpenProjectItem, OpenProjectDetailItem,
+)
+from schemas.open_dataset import (
+    DatasetDownloadRequest, DatasetDownloadResponse, DatasetDownloadResponseData,
+    TEXT_FORMATS, IMAGE_FORMATS,
+)
+from services.api.open_project_service import list_projects, get_project_detail, TASK_TO_PROJECT_TYPE, create_download_token, get_download_info
+from services.open_middleware import verify_open_api_token
+from services.export_service import ExportService
+from database import get_db_connection
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(
+    prefix="/api/v1/open/projects",
+    tags=["open-api-projects"],
+)
+
+
+@router.get("", response_model=OpenProjectListResponse)
+async def list_projects_endpoint(
+    _auth: dict = Depends(verify_open_api_token),
+    name: Optional[str] = Query(None, description="项目名称(模糊匹配)"),
+    project_type: Optional[str] = Query(None, alias="type", description="项目类型: image/text"),
+    status: Optional[str] = Query(None, description="项目状态筛选"),
+    page: int = Query(1, ge=1, description="页码"),
+    page_size: int = Query(20, ge=1, le=100, description="每页数量"),
+):
+    """查询标注项目列表"""
+    try:
+        result = list_projects(
+            name=name,
+            project_type=project_type,
+            status=status,
+            page=page,
+            page_size=page_size,
+        )
+    except Exception as e:
+        logger.error(f"Query projects failed: {e}")
+        raise HTTPException(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            detail={"error_code": "INTERNAL_ERROR", "message": str(e)},
+        )
+
+    return OpenProjectListResponse(
+        code=0,
+        message="success",
+        data=OpenProjectListData(**result),
+    )
+
+
+@router.get("/{project_id}", response_model=OpenProjectDetailResponse)
+async def get_project_detail_endpoint(
+    project_id: str,
+    _auth: dict = Depends(verify_open_api_token),
+):
+    """根据项目 ID 查询项目详细信息"""
+    try:
+        result = get_project_detail(project_id)
+    except Exception as e:
+        logger.error(f"Query project detail failed: {e}")
+        raise HTTPException(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            detail={"error_code": "INTERNAL_ERROR", "message": str(e)},
+        )
+
+    if not result:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail={"error_code": "PROJECT_NOT_FOUND", "message": "项目不存在"},
+        )
+
+    return OpenProjectDetailResponse(
+        code=0,
+        message="success",
+        data=OpenProjectDetailItem(**result),
+    )
+
+
+# --- Dataset download router ---
+
+download_router = APIRouter(
+    prefix="/api/v1/open",
+    tags=["open-api-datasets"],
+)
+
+
+@download_router.post("/projects/{project_id}/datasets/download", response_model=DatasetDownloadResponse)
+async def download_dataset(
+    project_id: str,
+    req: DatasetDownloadRequest,
+    _auth: dict = Depends(verify_open_api_token),
+):
+    """根据项目 ID 和格式导出标注数据集"""
+    # 1. Check project exists
+    project = _check_project(project_id)
+    if not project:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail={"error_code": "PROJECT_NOT_FOUND", "message": "项目不存在"},
+        )
+
+    # 2. Check format compatibility
+    project_type = TASK_TO_PROJECT_TYPE.get(project["task_type"] or "", "text")
+    if req.format in TEXT_FORMATS and project_type != "text":
+        raise HTTPException(
+            status_code=status.HTTP_400_BAD_REQUEST,
+            detail={"error_code": "FORMAT_NOT_COMPATIBLE", "message": f"格式 {req.format.value} 不适用于 {project_type} 类型项目"},
+        )
+    if req.format in IMAGE_FORMATS and project_type != "image":
+        raise HTTPException(
+            status_code=status.HTTP_400_BAD_REQUEST,
+            detail={"error_code": "FORMAT_NOT_COMPATIBLE", "message": f"格式 {req.format.value} 不适用于 {project_type} 类型项目"},
+        )
+
+    # 3. Check data availability
+    status_filter = "completed" if req.completed_only else "all"
+    tasks = ExportService.get_tasks_with_annotations(project_id, status_filter)
+    if not tasks:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail={"error_code": "NO_DATA_AVAILABLE", "message": "项目中没有可导出的数据"},
+        )
+
+    # 4. Execute export
+    format_val = req.format.value
+    export_method = {
+        "json": ExportService.export_to_json,
+        "csv": ExportService.export_to_csv,
+        "coco": ExportService.export_to_coco,
+        "yolo": ExportService.export_to_yolo,
+        "pascal_voc": ExportService.export_to_pascal_voc,
+        "sharegpt": ExportService.export_to_sharegpt,
+        "alpaca": ExportService.export_to_alpaca,
+    }.get(format_val)
+
+    if not export_method:
+        raise HTTPException(
+            status_code=status.HTTP_400_BAD_REQUEST,
+            detail={"error_code": "INVALID_FORMAT", "message": f"不支持的导出格式: {format_val}"},
+        )
+
+    try:
+        file_path, total_tasks, total_annotations = export_method(
+            project_id=project_id,
+            status_filter=status_filter,
+            include_metadata=False,
+        )
+    except Exception as e:
+        logger.error(f"Export failed for project {project_id}: {e}")
+        raise HTTPException(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            detail={"error_code": "EXPORT_FAILED", "message": f"导出失败: {str(e)}"},
+        )
+
+    # 5. Generate download token
+    now = datetime.now(timezone.utc)
+    expires_at = now + timedelta(hours=2)
+    download_token = create_download_token(
+        file_path=file_path,
+        project_id=project_id,
+        format_val=format_val,
+        total_exported=total_tasks,
+        expires_at=expires_at,
+    )
+
+    file_name = f"{project_id}_{format_val}_{now.strftime('%Y%m%d_%H%M%S')}.{_get_ext(format_val)}"
+    file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None
+
+    return DatasetDownloadResponse(
+        code=0,
+        message="success",
+        data=DatasetDownloadResponseData(
+            project_id=project_id,
+            format=format_val,
+            total_exported=total_tasks,
+            file_url=f"/api/v1/open/datasets/downloads/{download_token}",
+            file_name=file_name,
+            file_size=file_size,
+            expires_at=expires_at,
+            status="completed",
+        ),
+    )
+
+
+@download_router.get("/datasets/downloads/{download_token}")
+async def download_file(
+    download_token: str,
+    _auth: dict = Depends(verify_open_api_token),
+):
+    """根据下载令牌获取实际的数据集文件"""
+    info = get_download_info(download_token)
+    if not info:
+        raise HTTPException(
+            status_code=status.HTTP_410_GONE,
+            detail={"error_code": "DOWNLOAD_EXPIRED", "message": "下载链接已过期"},
+        )
+
+    if not os.path.exists(info["file_path"]):
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail={"error_code": "FILE_NOT_FOUND", "message": "导出文件不存在"},
+        )
+
+    filename = os.path.basename(info["file_path"])
+    media_type = _get_media_type(info["format_val"])
+
+    return FileResponse(
+        path=info["file_path"],
+        media_type=media_type,
+        filename=filename,
+    )
+
+
+def _check_project(project_id: str) -> Optional[dict]:
+    """Check project exists and is in allowed status."""
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        cursor.execute(
+            "SELECT id, name, task_type, status FROM projects WHERE id = %s",
+            (project_id,),
+        )
+        row = cursor.fetchone()
+        if row:
+            return dict(row)
+    return None
+
+
+def _get_ext(format_val: str) -> str:
+    return "xml" if format_val == "pascal_voc" else "json" if format_val == "coco" else "csv" if format_val == "csv" else "json"
+
+
+def _get_media_type(format_val: str) -> str:
+    return "text/csv" if format_val == "csv" else "application/xml" if format_val == "pascal_voc" else "application/json"

+ 16 - 0
backend/schemas/open_auth.py

@@ -0,0 +1,16 @@
+"""
+Pydantic schemas for Open API authentication.
+"""
+from pydantic import BaseModel, Field
+
+
+class TokenResponseData(BaseModel):
+    access_token: str = Field(..., description="JWT 访问令牌")
+    token_type: str = Field(default="Bearer", description="令牌类型")
+    expires_in: int = Field(default=7200, description="过期时间(秒)")
+
+
+class TokenResponse(BaseModel):
+    code: int = Field(default=0, description="状态码")
+    message: str = Field(default="success", description="提示信息")
+    data: TokenResponseData

+ 44 - 0
backend/schemas/open_dataset.py

@@ -0,0 +1,44 @@
+"""
+Pydantic schemas for Open API dataset download endpoints.
+"""
+from enum import Enum
+from datetime import datetime
+from typing import Optional
+from pydantic import BaseModel, Field
+
+
+class DatasetFormat(str, Enum):
+    ALPACA = "alpaca"
+    SHAREGPT = "sharegpt"
+    JSON = "json"
+    CSV = "csv"
+    COCO = "coco"
+    YOLO = "yolo"
+    PASCAL_VOC = "pascal_voc"
+
+
+TEXT_FORMATS = {DatasetFormat.ALPACA, DatasetFormat.SHAREGPT}
+IMAGE_FORMATS = {DatasetFormat.JSON, DatasetFormat.CSV, DatasetFormat.COCO,
+                 DatasetFormat.YOLO, DatasetFormat.PASCAL_VOC}
+
+
+class DatasetDownloadRequest(BaseModel):
+    format: DatasetFormat = Field(..., description="数据集格式")
+    completed_only: bool = Field(default=True, description="是否只导出已完成的任务")
+
+
+class DatasetDownloadResponseData(BaseModel):
+    project_id: str = Field(..., description="项目ID")
+    format: str = Field(..., description="导出格式")
+    total_exported: int = Field(..., description="导出任务数")
+    file_url: str = Field(..., description="下载链接")
+    file_name: str = Field(..., description="文件名")
+    file_size: Optional[int] = Field(None, description="文件大小(字节)")
+    expires_at: Optional[datetime] = Field(None, description="链接过期时间")
+    status: str = Field(default="completed", description="导出状态")
+
+
+class DatasetDownloadResponse(BaseModel):
+    code: int = Field(default=0)
+    message: str = Field(default="success")
+    data: DatasetDownloadResponseData

+ 53 - 0
backend/schemas/open_project.py

@@ -0,0 +1,53 @@
+"""
+Pydantic schemas for Open API project endpoints.
+"""
+from enum import Enum
+from datetime import datetime
+from typing import Optional, List
+from pydantic import BaseModel, Field
+
+
+class ProjectType(str, Enum):
+    IMAGE = "image"
+    TEXT = "text"
+
+
+class OpenProjectItem(BaseModel):
+    project_id: str = Field(..., description="项目ID")
+    project_name: str = Field(..., description="项目名称")
+    description: str = Field(default="", description="项目描述")
+    project_type: ProjectType = Field(..., description="项目类型")
+    task_type: str = Field(..., description="任务类型")
+    status: str = Field(..., description="项目状态")
+    created_by: str = Field(default="", description="创建人")
+    created_at: datetime = Field(..., description="创建时间")
+    updated_at: Optional[datetime] = Field(None, description="更新时间")
+    task_count: int = Field(default=0, description="总任务数")
+    completed_task_count: int = Field(default=0, description="已完成任务数")
+
+
+class OpenProjectDetailItem(OpenProjectItem):
+    assigned_task_count: int = Field(default=0, description="已分配任务数")
+    completion_percentage: float = Field(default=0.0, description="完成率")
+
+
+class OpenProjectListData(BaseModel):
+    items: List[OpenProjectItem]
+    total: int
+    page: int
+    page_size: int
+    total_pages: int
+    has_next: bool
+    has_prev: bool
+
+
+class OpenProjectListResponse(BaseModel):
+    code: int = Field(default=0)
+    message: str = Field(default="success")
+    data: OpenProjectListData
+
+
+class OpenProjectDetailResponse(BaseModel):
+    code: int = Field(default=0)
+    message: str = Field(default="success")
+    data: OpenProjectDetailItem

+ 37 - 0
backend/scripts/seed_open_api_app.py

@@ -0,0 +1,37 @@
+"""
+Seed script: Create a test API application for Open API testing.
+Run: python scripts/seed_open_api_app.py
+"""
+import sys
+import uuid
+import secrets
+sys.path.insert(0, ".")
+
+from database import get_db_connection
+
+
+def seed_test_app(app_name: str = "模型训练平台测试", description: str = "测试用接入应用"):
+    app_id = f"app_id_{secrets.token_hex(8)}"
+    app_secret = secrets.token_hex(16)
+
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        cursor.execute(
+            """
+            INSERT INTO api_applications
+                (id, app_id, app_name, app_secret, status, description)
+            VALUES (%s, %s, %s, %s, %s, %s)
+            """,
+            (str(uuid.uuid4()), app_id, app_name, app_secret, "active", description),
+        )
+
+    print("=" * 60)
+    print("测试接入应用已创建!请妥善保存以下凭证:")
+    print("=" * 60)
+    print(f"  app_id:     {app_id}")
+    print(f"  app_secret: {app_secret}")
+    print("=" * 60)
+
+
+if __name__ == "__main__":
+    seed_test_app()

+ 0 - 0
backend/services/api/__init__.py


+ 86 - 0
backend/services/api/open_auth_service.py

@@ -0,0 +1,86 @@
+"""
+Open API authentication service.
+Handles API Key + Secret HMAC-SHA256 signature verification and JWT token generation.
+"""
+import hmac
+import hashlib
+import time
+import logging
+
+import jwt
+from fastapi import HTTPException, status
+from config import settings
+
+logger = logging.getLogger(__name__)
+
+_NONCE_CACHE: dict[str, int] = {}
+TIMESTAMP_TOLERANCE = 300  # ±5 minutes
+TOKEN_EXPIRE_SECONDS = 7200  # 2 hours
+
+
+def verify_signature(app_id: str, timestamp: str, nonce: str, signature: str, app_secret: str) -> bool:
+    """Verify HMAC-SHA256 signature."""
+    message = f"{app_id}{timestamp}{nonce}"
+    expected = hmac.new(app_secret.encode(), message.encode(), hashlib.sha256).hexdigest()
+    return hmac.compare_digest(expected, signature)
+
+
+def check_and_store_nonce(nonce: str) -> bool:
+    """Return True if nonce is duplicate (should reject). Stores nonce with expiry."""
+    now = int(time.time())
+    expired = [n for n, exp in _NONCE_CACHE.items() if exp < now]
+    for n in expired:
+        del _NONCE_CACHE[n]
+    if nonce in _NONCE_CACHE:
+        return True
+    _NONCE_CACHE[nonce] = now + TIMESTAMP_TOLERANCE * 2
+    return False
+
+
+def create_open_api_token(app_id: str, app_name: str) -> str:
+    """Create a JWT access token for Open API access."""
+    import datetime
+    expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=TOKEN_EXPIRE_SECONDS)
+    payload = {
+        "sub": app_id,
+        "app_id": app_id,
+        "app_name": app_name,
+        "iat": int(time.time()),
+        "exp": int(expire.timestamp()),
+        "type": "open_api_access",
+    }
+    return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
+
+
+def validate_timestamp(timestamp_str: str) -> int:
+    """Validate that the request timestamp is within tolerance."""
+    try:
+        ts = int(timestamp_str)
+    except (ValueError, TypeError):
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail={"error_code": "TIMESTAMP_EXPIRED", "message": "时间戳格式无效"},
+        )
+    if abs(int(time.time()) - ts) > TIMESTAMP_TOLERANCE:
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail={"error_code": "TIMESTAMP_EXPIRED", "message": "时间戳过期(超过 ±5 分钟)"},
+        )
+    return ts
+
+
+def get_application_by_app_id(db_conn, app_id: str) -> dict | None:
+    """Look up an application by app_id."""
+    cursor = db_conn.cursor()
+    cursor.execute(
+        "SELECT id, app_id, app_name, app_secret, status FROM api_applications WHERE app_id = %s",
+        (app_id,),
+    )
+    row = cursor.fetchone()
+    return dict(row) if row else None
+
+
+def update_last_used(db_conn, app_id: str):
+    """Update the last_used_at timestamp for an application."""
+    cursor = db_conn.cursor()
+    cursor.execute("UPDATE api_applications SET last_used_at = NOW() WHERE app_id = %s", (app_id,))

+ 208 - 0
backend/services/api/open_project_service.py

@@ -0,0 +1,208 @@
+"""
+Open API project query service.
+Handles project listing, detail, and dataset export queries.
+"""
+import math
+import uuid
+import logging
+from datetime import datetime, timezone
+from typing import Optional, Dict
+
+from database import get_db_connection
+
+logger = logging.getLogger(__name__)
+
+# task_type → project_type 映射
+TASK_TO_PROJECT_TYPE = {
+    "text_classification": "text",
+    "ner": "text",
+    "image_classification": "image",
+    "object_detection": "image",
+    "polygon": "image",
+}
+
+# 开放 API 允许访问的项目状态
+ALLOWED_STATUSES = ("ready", "in_progress", "completed")
+
+# Download token store: {token: {file_path, project_id, format_val, total_exported, expires_at}}
+_DOWNLOAD_STORE: Dict[str, Dict] = {}
+
+
+def _map_project_type(task_type: str) -> str:
+    """将数据库 task_type 映射为 API 返回的 project_type。"""
+    return TASK_TO_PROJECT_TYPE.get(task_type, "text")
+
+
+def list_projects(
+    name: Optional[str] = None,
+    project_type: Optional[str] = None,
+    status: Optional[str] = None,
+    page: int = 1,
+    page_size: int = 20,
+) -> dict:
+    """
+    查询项目列表。
+
+    返回包含 items、total、分页信息的字典。
+    仅返回状态为 ready/in_progress/completed 的项目。
+    """
+    page = max(page, 1)
+    page_size = min(max(page_size, 1), 100)
+
+    conditions = ["p.status IN %s"]
+    params = [ALLOWED_STATUSES]
+
+    if name:
+        conditions.append("p.name LIKE %s")
+        params.append(f"%{name}%")
+
+    if project_type:
+        allowed_types = [t for t, pt in TASK_TO_PROJECT_TYPE.items() if pt == project_type]
+        if allowed_types:
+            conditions.append("p.task_type IN %s")
+            params.append(tuple(allowed_types))
+
+    if status and status in ALLOWED_STATUSES:
+        conditions.append("p.status = %s")
+        params.append(status)
+
+    where = " AND ".join(conditions)
+
+    with get_db_connection() as conn:
+        # Count total
+        cursor = conn.cursor()
+        cursor.execute(f"SELECT COUNT(*) AS cnt FROM projects p WHERE {where}", tuple(params))
+        total = cursor.fetchone()["cnt"]
+
+        total_pages = max(math.ceil(total / page_size), 1) if total > 0 else 1
+        offset = (page - 1) * page_size
+
+        # Fetch items
+        cursor.execute(
+            f"""
+            SELECT p.id, p.name, p.description, p.task_type, p.status,
+                   p.created_at, p.updated_at,
+                   COUNT(t.id) AS task_count,
+                   SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) AS completed_task_count
+            FROM projects p
+            LEFT JOIN tasks t ON t.project_id = p.id
+            WHERE {where}
+            GROUP BY p.id
+            ORDER BY p.updated_at DESC
+            LIMIT %s OFFSET %s
+            """,
+            tuple(params) + (page_size, offset),
+        )
+        rows = cursor.fetchall()
+
+    items = []
+    for row in rows:
+        items.append({
+            "project_id": row["id"],
+            "project_name": row["name"],
+            "description": row["description"] or "",
+            "project_type": _map_project_type(row["task_type"] or ""),
+            "task_type": row["task_type"] or "",
+            "status": row["status"],
+            "created_by": "",
+            "created_at": row["created_at"],
+            "updated_at": row["updated_at"],
+            "task_count": int(row["task_count"]),
+            "completed_task_count": int(row["completed_task_count"]),
+        })
+
+    return {
+        "items": items,
+        "total": total,
+        "page": page,
+        "page_size": page_size,
+        "total_pages": total_pages,
+        "has_next": page < total_pages,
+        "has_prev": page > 1,
+    }
+
+
+def get_project_detail(project_id: str) -> Optional[dict]:
+    """
+    根据项目 ID 查询项目详情。
+    返回包含统计信息的完整项目信息。
+    """
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        cursor.execute(
+            """
+            SELECT p.id, p.name, p.description, p.task_type, p.status,
+                   p.created_at, p.updated_at,
+                   COUNT(t.id) AS task_count,
+                   SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) AS completed_task_count,
+                   SUM(CASE WHEN t.assigned_to IS NOT NULL THEN 1 ELSE 0 END) AS assigned_task_count
+            FROM projects p
+            LEFT JOIN tasks t ON t.project_id = p.id
+            WHERE p.id = %s
+            GROUP BY p.id
+            """,
+            (project_id,),
+        )
+        row = cursor.fetchone()
+        if not row:
+            return None
+
+    total = max(row["task_count"], 1)
+    completed = row["completed_task_count"]
+
+    return {
+        "project_id": row["id"],
+        "project_name": row["name"],
+        "description": row["description"] or "",
+        "project_type": _map_project_type(row["task_type"] or ""),
+        "task_type": row["task_type"] or "",
+        "status": row["status"],
+        "created_by": "",
+        "created_at": row["created_at"],
+        "updated_at": row["updated_at"],
+        "task_count": int(row["task_count"]),
+        "completed_task_count": int(row["completed_task_count"]),
+        "assigned_task_count": int(row["assigned_task_count"]),
+        "completion_percentage": round(completed / total * 100, 1),
+    }
+
+
+# --- Download token management ---
+
+def create_download_token(
+    file_path: str,
+    project_id: str,
+    format_val: str,
+    total_exported: int,
+    expires_at: datetime,
+) -> str:
+    """Create a download token and store the download info."""
+    token = f"dl_{uuid.uuid4().hex[:12]}"
+    _DOWNLOAD_STORE[token] = {
+        "file_path": file_path,
+        "project_id": project_id,
+        "format_val": format_val,
+        "total_exported": total_exported,
+        "expires_at": expires_at,
+    }
+    _cleanup_expired_tokens()
+    return token
+
+
+def get_download_info(token: str) -> Optional[Dict]:
+    """Get download info for a token. Returns None if expired or not found."""
+    info = _DOWNLOAD_STORE.get(token)
+    if not info:
+        return None
+    if info["expires_at"] < datetime.now(timezone.utc):
+        del _DOWNLOAD_STORE[token]
+        return None
+    return info
+
+
+def _cleanup_expired_tokens():
+    """Remove expired tokens."""
+    now = datetime.now(timezone.utc)
+    expired = [t for t, info in _DOWNLOAD_STORE.items() if info["expires_at"] < now]
+    for t in expired:
+        del _DOWNLOAD_STORE[t]

+ 50 - 0
backend/services/open_middleware.py

@@ -0,0 +1,50 @@
+"""
+Open API 认证中间件(依赖项)。
+验证 Open API 的 Bearer JWT Token,提取 app_id。
+与现有 AuthMiddleware 独立,不校验 role 字段。
+"""
+import jwt
+import logging
+from fastapi import Depends, HTTPException, status, Header
+from config import settings
+
+logger = logging.getLogger(__name__)
+
+
+def verify_open_api_token(authorization: str = Header(...)) -> dict:
+    """
+    验证 Open API Bearer Token。
+    从 Authorization Header 中解析 JWT,确认 type=open_api_access。
+    返回 decoded payload,包含 app_id, app_name 等。
+    """
+    parts = authorization.split()
+    if len(parts) != 2 or parts[0].lower() != "bearer":
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail={"error_code": "TOKEN_INVALID", "message": "无效的认证令牌格式"},
+        )
+
+    token = parts[1]
+
+    try:
+        payload = jwt.decode(
+            token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]
+        )
+    except jwt.ExpiredSignatureError:
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail={"error_code": "TOKEN_EXPIRED", "message": "Access Token 已过期"},
+        )
+    except jwt.InvalidTokenError:
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail={"error_code": "TOKEN_INVALID", "message": "无效的 Access Token"},
+        )
+
+    if payload.get("type") != "open_api_access":
+        raise HTTPException(
+            status_code=status.HTTP_403_FORBIDDEN,
+            detail={"error_code": "TOKEN_INVALID", "message": "无权访问 Open API"},
+        )
+
+    return payload