""" Task API router. Provides CRUD endpoints for task management. """ import uuid import json from typing import List, Optional from fastapi import APIRouter, HTTPException, status, Query, Request from database import get_db_connection from schemas.task import TaskCreate, TaskUpdate, TaskResponse from models import Task router = APIRouter( prefix="/api/tasks", tags=["tasks"] ) def calculate_progress(data_str: str, annotation_count: int) -> float: """计算任务进度""" try: data = json.loads(data_str) if isinstance(data_str, str) else data_str items = data.get('items', []) if not items: return 0.0 return min(annotation_count / len(items), 1.0) except: return 0.0 @router.get("", response_model=List[TaskResponse]) async def list_tasks( request: Request, project_id: Optional[str] = Query(None, description="Filter by project ID"), status_filter: Optional[str] = Query(None, alias="status", description="Filter by status"), assigned_to: Optional[str] = Query(None, description="Filter by assigned user") ): """ List all tasks with optional filters. Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() # Build query with filters query = """ SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COUNT(a.id) as annotation_count FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE 1=1 """ params = [] if project_id: query += " AND t.project_id = ?" params.append(project_id) if status_filter: query += " AND t.status = ?" params.append(status_filter) if assigned_to: query += " AND t.assigned_to = ?" params.append(assigned_to) query += " GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at ORDER BY t.created_at DESC" cursor.execute(query, tuple(params)) rows = cursor.fetchall() tasks = [] for row in rows: data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] progress = calculate_progress(row["data"], row["annotation_count"]) tasks.append(TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=progress )) return tasks @router.post("", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) async def create_task(request: Request, task: TaskCreate): """ Create a new task. Requires authentication. """ task_id = f"task_{uuid.uuid4().hex[:12]}" user = request.state.user assigned_to = task.assigned_to if task.assigned_to else user["id"] with get_db_connection() as conn: cursor = conn.cursor() # Verify project exists cursor.execute("SELECT id FROM projects WHERE id = ?", (task.project_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{task.project_id}' not found" ) data_json = json.dumps(task.data) cursor.execute(""" INSERT INTO tasks (id, project_id, name, data, status, assigned_to) VALUES (?, ?, ?, ?, 'pending', ?) """, (task_id, task.project_id, task.name, data_json, assigned_to)) cursor.execute(""" SELECT id, project_id, name, data, status, assigned_to, created_at FROM tasks WHERE id = ? """, (task_id,)) row = cursor.fetchone() data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=0.0 ) @router.get("/{task_id}", response_model=TaskResponse) async def get_task(request: Request, task_id: str): """ Get task by ID. Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COUNT(a.id) as annotation_count FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.id = ? GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at """, (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] progress = calculate_progress(row["data"], row["annotation_count"]) return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=progress ) @router.put("/{task_id}", response_model=TaskResponse) async def update_task(request: Request, task_id: str, task: TaskUpdate): """ Update an existing task. Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) update_fields = [] update_values = [] if task.name is not None: update_fields.append("name = ?") update_values.append(task.name) if task.data is not None: update_fields.append("data = ?") update_values.append(json.dumps(task.data)) if task.status is not None: update_fields.append("status = ?") update_values.append(task.status) if task.assigned_to is not None: update_fields.append("assigned_to = ?") update_values.append(task.assigned_to) if update_fields: update_values.append(task_id) cursor.execute(f""" UPDATE tasks SET {', '.join(update_fields)} WHERE id = ? """, tuple(update_values)) cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COUNT(a.id) as annotation_count FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.id = ? GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at """, (task_id,)) row = cursor.fetchone() data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] progress = calculate_progress(row["data"], row["annotation_count"]) return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=progress ) @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_task(request: Request, task_id: str): """ Delete a task and all associated annotations. Requires authentication and admin role. """ user = request.state.user if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以删除任务" ) with get_db_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) return None @router.get("/projects/{project_id}/tasks", response_model=List[TaskResponse]) async def get_project_tasks(request: Request, project_id: str): """ Get all tasks for a specific project. Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{project_id}' not found" ) cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COUNT(a.id) as annotation_count FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.project_id = ? GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at ORDER BY t.created_at DESC """, (project_id,)) rows = cursor.fetchall() tasks = [] for row in rows: data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] progress = calculate_progress(row["data"], row["annotation_count"]) tasks.append(TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=progress )) return tasks