""" Project API router. Provides CRUD endpoints for project management. """ import uuid from datetime import datetime from typing import List, Optional from fastapi import APIRouter, HTTPException, status, Request, Query from database import get_db_connection from schemas.project import ( ProjectCreate, ProjectUpdate, ProjectResponse, ProjectStatus, ProjectSource, ProjectStatusUpdate, ProjectConfigUpdate, ProjectResponseExtended, ) from models import Project router = APIRouter( prefix="/api/projects", tags=["projects"] ) # 定义合法的状态转换 VALID_STATUS_TRANSITIONS = { ProjectStatus.DRAFT: [ProjectStatus.CONFIGURING], ProjectStatus.CONFIGURING: [ProjectStatus.READY, ProjectStatus.DRAFT], ProjectStatus.READY: [ProjectStatus.IN_PROGRESS, ProjectStatus.CONFIGURING], ProjectStatus.IN_PROGRESS: [ProjectStatus.COMPLETED, ProjectStatus.READY], ProjectStatus.COMPLETED: [ProjectStatus.IN_PROGRESS], # 允许重新开放 } @router.get("", response_model=List[ProjectResponseExtended]) async def list_projects( request: Request, status_filter: Optional[ProjectStatus] = Query(None, alias="status", description="按状态筛选"), source_filter: Optional[ProjectSource] = Query(None, alias="source", description="按来源筛选"), ): """ List projects with extended information. For admin users: Returns all projects with their total task counts. For annotator users: Returns only projects that have tasks assigned to them, with task counts reflecting only their assigned tasks. Query Parameters: status: Filter by project status (draft, configuring, ready, in_progress, completed) source: Filter by project source (internal, external) Requires authentication. """ user = request.state.user user_id = user["id"] user_role = user["role"] with get_db_connection() as conn: cursor = conn.cursor() if user_role == "admin": # 管理员:返回所有项目及其全部任务统计 query = """ SELECT p.id, p.name, p.description, p.config, p.task_type, p.status, p.source, p.external_id, p.created_at, p.updated_at, COUNT(t.id) as task_count, SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_task_count, SUM(CASE WHEN t.assigned_to IS NOT NULL THEN 1 ELSE 0 END) as assigned_task_count FROM projects p LEFT JOIN tasks t ON p.id = t.project_id """ conditions = [] params = [] if status_filter: conditions.append("p.status = ?") params.append(status_filter.value) if source_filter: conditions.append("p.source = ?") params.append(source_filter.value) if conditions: query += " WHERE " + " AND ".join(conditions) query += " GROUP BY p.id ORDER BY p.created_at DESC" else: # 标注员:只返回有分配给他们任务的项目,任务数量只统计分配给他们的任务 query = """ SELECT p.id, p.name, p.description, p.config, p.task_type, p.status, p.source, p.external_id, p.created_at, p.updated_at, COUNT(t.id) as task_count, SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_task_count, COUNT(t.id) as assigned_task_count FROM projects p INNER JOIN tasks t ON p.id = t.project_id AND t.assigned_to = ? """ params = [user_id] conditions = [] if status_filter: conditions.append("p.status = ?") params.append(status_filter.value) if source_filter: conditions.append("p.source = ?") params.append(source_filter.value) if conditions: query += " WHERE " + " AND ".join(conditions) query += " GROUP BY p.id HAVING COUNT(t.id) > 0 ORDER BY p.created_at DESC" cursor.execute(query, params) rows = cursor.fetchall() projects = [] for row in rows: projects.append(ProjectResponseExtended( id=row["id"], name=row["name"], description=row["description"] or "", config=row["config"], task_type=row["task_type"], status=ProjectStatus(row["status"]) if row["status"] else ProjectStatus.DRAFT, source=ProjectSource(row["source"]) if row["source"] else ProjectSource.INTERNAL, external_id=row["external_id"], created_at=row["created_at"], updated_at=row["updated_at"], task_count=row["task_count"] or 0, completed_task_count=row["completed_task_count"] or 0, assigned_task_count=row["assigned_task_count"] or 0, )) return projects @router.post("", response_model=ProjectResponse, status_code=status.HTTP_201_CREATED) async def create_project(request: Request, project: ProjectCreate): """ Create a new project. Args: request: FastAPI Request object (contains user info) project: Project creation data Returns: Created project with generated ID Requires authentication. """ # Generate unique ID project_id = f"proj_{uuid.uuid4().hex[:12]}" with get_db_connection() as conn: cursor = conn.cursor() # Insert new project with task_type cursor.execute(""" INSERT INTO projects (id, name, description, config, task_type) VALUES (?, ?, ?, ?, ?) """, ( project_id, project.name, project.description, project.config, project.task_type )) # Fetch the created project cursor.execute(""" SELECT id, name, description, config, created_at FROM projects WHERE id = ? """, (project_id,)) row = cursor.fetchone() return ProjectResponse( id=row["id"], name=row["name"], description=row["description"] or "", config=row["config"], created_at=row["created_at"], task_count=0 ) @router.get("/{project_id}", response_model=ProjectResponse) async def get_project(request: Request, project_id: str): """ Get project by ID. Args: request: FastAPI Request object (contains user info) project_id: Project unique identifier Returns: Project details with task count Raises: HTTPException: 404 if project not found Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() # Get project with task count cursor.execute(""" SELECT p.id, p.name, p.description, p.config, p.created_at, COUNT(t.id) as task_count FROM projects p LEFT JOIN tasks t ON p.id = t.project_id WHERE p.id = ? GROUP BY p.id """, (project_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{project_id}' not found" ) return ProjectResponse( id=row["id"], name=row["name"], description=row["description"] or "", config=row["config"], created_at=row["created_at"], task_count=row["task_count"] ) @router.put("/{project_id}", response_model=ProjectResponse) async def update_project(request: Request, project_id: str, project: ProjectUpdate): """ Update an existing project. Args: request: FastAPI Request object (contains user info) project_id: Project unique identifier project: Project update data Returns: Updated project details Raises: HTTPException: 404 if project not found Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() # Check if project exists cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{project_id}' not found" ) # Build update query dynamically based on provided fields update_fields = [] update_values = [] if project.name is not None: update_fields.append("name = ?") update_values.append(project.name) if project.description is not None: update_fields.append("description = ?") update_values.append(project.description) if project.config is not None: update_fields.append("config = ?") update_values.append(project.config) if not update_fields: # No fields to update, just return current project cursor.execute(""" SELECT p.id, p.name, p.description, p.config, p.created_at, COUNT(t.id) as task_count FROM projects p LEFT JOIN tasks t ON p.id = t.project_id WHERE p.id = ? GROUP BY p.id """, (project_id,)) row = cursor.fetchone() return ProjectResponse( id=row["id"], name=row["name"], description=row["description"] or "", config=row["config"], created_at=row["created_at"], task_count=row["task_count"] ) # Execute update update_values.append(project_id) cursor.execute(f""" UPDATE projects SET {', '.join(update_fields)} WHERE id = ? """, update_values) # Fetch and return updated project cursor.execute(""" SELECT p.id, p.name, p.description, p.config, p.created_at, COUNT(t.id) as task_count FROM projects p LEFT JOIN tasks t ON p.id = t.project_id WHERE p.id = ? GROUP BY p.id """, (project_id,)) row = cursor.fetchone() return ProjectResponse( id=row["id"], name=row["name"], description=row["description"] or "", config=row["config"], created_at=row["created_at"], task_count=row["task_count"] ) @router.delete("/{project_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_project(request: Request, project_id: str): """ Delete a project and all associated tasks. Args: request: FastAPI Request object (contains user info) project_id: Project unique identifier Raises: HTTPException: 404 if project not found HTTPException: 403 if user is not admin Requires authentication and admin role. """ # Check if user has admin role user = request.state.user if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以删除项目" ) with get_db_connection() as conn: cursor = conn.cursor() # Check if project exists cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{project_id}' not found" ) # Delete project (cascade will delete tasks and annotations) cursor.execute("DELETE FROM projects WHERE id = ?", (project_id,)) return None @router.put("/{project_id}/status", response_model=ProjectResponseExtended) async def update_project_status(request: Request, project_id: str, status_update: ProjectStatusUpdate): """ Update project status with validation. Only allows valid status transitions: - draft → configuring - configuring → ready, draft - ready → in_progress, configuring - in_progress → completed, ready - completed → in_progress (reopen) Args: request: FastAPI Request object (contains user info) project_id: Project unique identifier status_update: New status Returns: Updated project details Raises: HTTPException: 404 if project not found HTTPException: 400 if status transition is invalid HTTPException: 403 if user is not admin """ # Check if user has admin role user = request.state.user if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以更新项目状态" ) with get_db_connection() as conn: cursor = conn.cursor() # Get current project status cursor.execute("SELECT id, status FROM projects WHERE id = ?", (project_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"项目 '{project_id}' 不存在" ) current_status = ProjectStatus(row["status"]) if row["status"] else ProjectStatus.DRAFT new_status = status_update.status # Validate status transition valid_transitions = VALID_STATUS_TRANSITIONS.get(current_status, []) if new_status not in valid_transitions: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"无效的状态转换: {current_status.value} → {new_status.value}。允许的转换: {[s.value for s in valid_transitions]}" ) # Update status cursor.execute(""" UPDATE projects SET status = ?, updated_at = ? WHERE id = ? """, (new_status.value, datetime.now(), project_id)) # Fetch and return updated project cursor.execute(""" SELECT p.id, p.name, p.description, p.config, p.task_type, p.status, p.source, p.external_id, p.created_at, p.updated_at, COUNT(t.id) as task_count, SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_task_count, SUM(CASE WHEN t.assigned_to IS NOT NULL THEN 1 ELSE 0 END) as assigned_task_count FROM projects p LEFT JOIN tasks t ON p.id = t.project_id WHERE p.id = ? GROUP BY p.id """, (project_id,)) row = cursor.fetchone() return ProjectResponseExtended( id=row["id"], name=row["name"], description=row["description"] or "", config=row["config"], task_type=row["task_type"], status=ProjectStatus(row["status"]) if row["status"] else ProjectStatus.DRAFT, source=ProjectSource(row["source"]) if row["source"] else ProjectSource.INTERNAL, external_id=row["external_id"], created_at=row["created_at"], updated_at=row["updated_at"], task_count=row["task_count"] or 0, completed_task_count=row["completed_task_count"] or 0, assigned_task_count=row["assigned_task_count"] or 0, ) @router.put("/{project_id}/config", response_model=ProjectResponseExtended) async def update_project_config(request: Request, project_id: str, config_update: ProjectConfigUpdate): """ Update project configuration (XML config and labels). This endpoint is used by admins to configure the labeling interface for projects created by external systems. Args: request: FastAPI Request object (contains user info) project_id: Project unique identifier config_update: New configuration Returns: Updated project details Raises: HTTPException: 404 if project not found HTTPException: 400 if config is invalid HTTPException: 403 if user is not admin """ # Check if user has admin role user = request.state.user if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以更新项目配置" ) with get_db_connection() as conn: cursor = conn.cursor() # Check if project exists cursor.execute("SELECT id, status FROM projects WHERE id = ?", (project_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"项目 '{project_id}' 不存在" ) current_status = ProjectStatus(row["status"]) if row["status"] else ProjectStatus.DRAFT # Only allow config updates in draft or configuring status if current_status not in [ProjectStatus.DRAFT, ProjectStatus.CONFIGURING]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"只能在 draft 或 configuring 状态下更新配置,当前状态: {current_status.value}" ) # Validate XML config (basic check) config = config_update.config.strip() if not config.startswith("<") or not config.endswith(">"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="无效的XML配置格式" ) # Update config and set status to configuring if it was draft new_status = ProjectStatus.CONFIGURING if current_status == ProjectStatus.DRAFT else current_status # Build update query based on provided fields if config_update.task_type: cursor.execute(""" UPDATE projects SET config = ?, task_type = ?, status = ?, updated_at = ? WHERE id = ? """, (config, config_update.task_type, new_status.value, datetime.now(), project_id)) else: cursor.execute(""" UPDATE projects SET config = ?, status = ?, updated_at = ? WHERE id = ? """, (config, new_status.value, datetime.now(), project_id)) # Fetch and return updated project cursor.execute(""" SELECT p.id, p.name, p.description, p.config, p.task_type, p.status, p.source, p.external_id, p.created_at, p.updated_at, COUNT(t.id) as task_count, SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_task_count, SUM(CASE WHEN t.assigned_to IS NOT NULL THEN 1 ELSE 0 END) as assigned_task_count FROM projects p LEFT JOIN tasks t ON p.id = t.project_id WHERE p.id = ? GROUP BY p.id """, (project_id,)) row = cursor.fetchone() return ProjectResponseExtended( id=row["id"], name=row["name"], description=row["description"] or "", config=row["config"], task_type=row["task_type"], status=ProjectStatus(row["status"]) if row["status"] else ProjectStatus.DRAFT, source=ProjectSource(row["source"]) if row["source"] else ProjectSource.INTERNAL, external_id=row["external_id"], created_at=row["created_at"], updated_at=row["updated_at"], task_count=row["task_count"] or 0, completed_task_count=row["completed_task_count"] or 0, assigned_task_count=row["assigned_task_count"] or 0, )