""" Task API router. Provides CRUD endpoints for task management. """ import uuid import json from typing import List, Optional from fastapi import APIRouter, HTTPException, status, Query, Request from database import get_db_connection from schemas.task import TaskCreate, TaskUpdate, TaskResponse from models import Task router = APIRouter( prefix="/api/tasks", tags=["tasks"] ) @router.get("", response_model=List[TaskResponse]) async def list_tasks( request: Request, project_id: Optional[str] = Query(None, description="Filter by project ID"), status_filter: Optional[str] = Query(None, alias="status", description="Filter by status"), assigned_to: Optional[str] = Query(None, description="Filter by assigned user") ): """ List all tasks with optional filters. Args: request: FastAPI Request object (contains user info) project_id: Optional project ID filter status_filter: Optional status filter (pending, in_progress, completed) assigned_to: Optional assigned user filter Returns: List of tasks matching the filters Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() # Build query with filters query = """ SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COALESCE( CAST(COUNT(a.id) AS FLOAT) / NULLIF( (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 0 ), 0.0 ) as progress FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE 1=1 """ params = [] if project_id: query += " AND t.project_id = ?" params.append(project_id) if status_filter: query += " AND t.status = ?" params.append(status_filter) if assigned_to: query += " AND t.assigned_to = ?" params.append(assigned_to) query += " GROUP BY t.id ORDER BY t.created_at DESC" cursor.execute(query, params) rows = cursor.fetchall() tasks = [] for row in rows: # Parse JSON data data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] tasks.append(TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=row["progress"] )) return tasks @router.post("", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) async def create_task(request: Request, task: TaskCreate): """ Create a new task. Args: request: FastAPI Request object (contains user info) task: Task creation data Returns: Created task with generated ID Raises: HTTPException: 404 if project not found Requires authentication. Note: If assigned_to is not provided, the task will be assigned to the current user. """ # Generate unique ID task_id = f"task_{uuid.uuid4().hex[:12]}" # Get current user user = request.state.user # Use provided assigned_to or default to current user assigned_to = task.assigned_to if task.assigned_to else user["id"] with get_db_connection() as conn: cursor = conn.cursor() # Verify project exists cursor.execute("SELECT id FROM projects WHERE id = ?", (task.project_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{task.project_id}' not found" ) # Serialize data to JSON data_json = json.dumps(task.data) # Insert new task cursor.execute(""" INSERT INTO tasks (id, project_id, name, data, status, assigned_to) VALUES (?, ?, ?, ?, 'pending', ?) """, ( task_id, task.project_id, task.name, data_json, assigned_to )) # Fetch the created task cursor.execute(""" SELECT id, project_id, name, data, status, assigned_to, created_at FROM tasks WHERE id = ? """, (task_id,)) row = cursor.fetchone() # Parse JSON data data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=0.0 ) @router.get("/{task_id}", response_model=TaskResponse) async def get_task(request: Request, task_id: str): """ Get task by ID. Args: request: FastAPI Request object (contains user info) task_id: Task unique identifier Returns: Task details with progress Raises: HTTPException: 404 if task not found Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() # Get task with progress cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COALESCE( CAST(COUNT(a.id) AS FLOAT) / NULLIF( (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 0 ), 0.0 ) as progress FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.id = ? GROUP BY t.id """, (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) # Parse JSON data data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=row["progress"] ) @router.put("/{task_id}", response_model=TaskResponse) async def update_task(request: Request, task_id: str, task: TaskUpdate): """ Update an existing task. Args: request: FastAPI Request object (contains user info) task_id: Task unique identifier task: Task update data Returns: Updated task details Raises: HTTPException: 404 if task not found Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() # Check if task exists cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) # Build update query dynamically based on provided fields update_fields = [] update_values = [] if task.name is not None: update_fields.append("name = ?") update_values.append(task.name) if task.data is not None: update_fields.append("data = ?") update_values.append(json.dumps(task.data)) if task.status is not None: update_fields.append("status = ?") update_values.append(task.status) if task.assigned_to is not None: update_fields.append("assigned_to = ?") update_values.append(task.assigned_to) if not update_fields: # No fields to update, just return current task cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COALESCE( CAST(COUNT(a.id) AS FLOAT) / NULLIF( (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 0 ), 0.0 ) as progress FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.id = ? GROUP BY t.id """, (task_id,)) row = cursor.fetchone() data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=row["progress"] ) # Execute update update_values.append(task_id) cursor.execute(f""" UPDATE tasks SET {', '.join(update_fields)} WHERE id = ? """, update_values) # Fetch and return updated task cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COALESCE( CAST(COUNT(a.id) AS FLOAT) / NULLIF( (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 0 ), 0.0 ) as progress FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.id = ? GROUP BY t.id """, (task_id,)) row = cursor.fetchone() data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=row["progress"] ) @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_task(request: Request, task_id: str): """ Delete a task and all associated annotations. Args: request: FastAPI Request object (contains user info) task_id: Task unique identifier Raises: HTTPException: 404 if task not found HTTPException: 403 if user is not admin Requires authentication and admin role. """ # Check if user has admin role user = request.state.user if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以删除任务" ) with get_db_connection() as conn: cursor = conn.cursor() # Check if task exists cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) # Delete task (cascade will delete annotations) cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) return None @router.get("/projects/{project_id}/tasks", response_model=List[TaskResponse]) async def get_project_tasks(request: Request, project_id: str): """ Get all tasks for a specific project. Args: request: FastAPI Request object (contains user info) project_id: Project unique identifier Returns: List of tasks belonging to the project Raises: HTTPException: 404 if project not found Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() # Verify project exists cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{project_id}' not found" ) # Get all tasks for the project cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COALESCE( CAST(COUNT(a.id) AS FLOAT) / NULLIF( (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 0 ), 0.0 ) as progress FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.project_id = ? GROUP BY t.id ORDER BY t.created_at DESC """, (project_id,)) rows = cursor.fetchall() tasks = [] for row in rows: # Parse JSON data data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] tasks.append(TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=row["progress"] )) return tasks