""" Task API router. Provides CRUD endpoints for task management. """ import uuid import json from typing import List, Optional from fastapi import APIRouter, HTTPException, status, Query from database import get_db_connection from schemas.task import TaskCreate, TaskUpdate, TaskResponse from models import Task router = APIRouter( prefix="/api/tasks", tags=["tasks"] ) @router.get("", response_model=List[TaskResponse]) async def list_tasks( project_id: Optional[str] = Query(None, description="Filter by project ID"), status_filter: Optional[str] = Query(None, alias="status", description="Filter by status"), assigned_to: Optional[str] = Query(None, description="Filter by assigned user") ): """ List all tasks with optional filters. Args: project_id: Optional project ID filter status_filter: Optional status filter (pending, in_progress, completed) assigned_to: Optional assigned user filter Returns: List of tasks matching the filters """ with get_db_connection() as conn: cursor = conn.cursor() # Build query with filters query = """ SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COALESCE( CAST(COUNT(a.id) AS FLOAT) / NULLIF( (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 0 ), 0.0 ) as progress FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE 1=1 """ params = [] if project_id: query += " AND t.project_id = ?" params.append(project_id) if status_filter: query += " AND t.status = ?" params.append(status_filter) if assigned_to: query += " AND t.assigned_to = ?" params.append(assigned_to) query += " GROUP BY t.id ORDER BY t.created_at DESC" cursor.execute(query, params) rows = cursor.fetchall() tasks = [] for row in rows: # Parse JSON data data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] tasks.append(TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=row["progress"] )) return tasks @router.post("", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) async def create_task(task: TaskCreate): """ Create a new task. Args: task: Task creation data Returns: Created task with generated ID Raises: HTTPException: 404 if project not found """ # Generate unique ID task_id = f"task_{uuid.uuid4().hex[:12]}" with get_db_connection() as conn: cursor = conn.cursor() # Verify project exists cursor.execute("SELECT id FROM projects WHERE id = ?", (task.project_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{task.project_id}' not found" ) # Serialize data to JSON data_json = json.dumps(task.data) # Insert new task cursor.execute(""" INSERT INTO tasks (id, project_id, name, data, status, assigned_to) VALUES (?, ?, ?, ?, 'pending', ?) """, ( task_id, task.project_id, task.name, data_json, task.assigned_to )) # Fetch the created task cursor.execute(""" SELECT id, project_id, name, data, status, assigned_to, created_at FROM tasks WHERE id = ? """, (task_id,)) row = cursor.fetchone() # Parse JSON data data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=0.0 ) @router.get("/{task_id}", response_model=TaskResponse) async def get_task(task_id: str): """ Get task by ID. Args: task_id: Task unique identifier Returns: Task details with progress Raises: HTTPException: 404 if task not found """ with get_db_connection() as conn: cursor = conn.cursor() # Get task with progress cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COALESCE( CAST(COUNT(a.id) AS FLOAT) / NULLIF( (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 0 ), 0.0 ) as progress FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.id = ? GROUP BY t.id """, (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) # Parse JSON data data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=row["progress"] ) @router.put("/{task_id}", response_model=TaskResponse) async def update_task(task_id: str, task: TaskUpdate): """ Update an existing task. Args: task_id: Task unique identifier task: Task update data Returns: Updated task details Raises: HTTPException: 404 if task not found """ with get_db_connection() as conn: cursor = conn.cursor() # Check if task exists cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) # Build update query dynamically based on provided fields update_fields = [] update_values = [] if task.name is not None: update_fields.append("name = ?") update_values.append(task.name) if task.data is not None: update_fields.append("data = ?") update_values.append(json.dumps(task.data)) if task.status is not None: update_fields.append("status = ?") update_values.append(task.status) if task.assigned_to is not None: update_fields.append("assigned_to = ?") update_values.append(task.assigned_to) if not update_fields: # No fields to update, just return current task cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COALESCE( CAST(COUNT(a.id) AS FLOAT) / NULLIF( (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 0 ), 0.0 ) as progress FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.id = ? GROUP BY t.id """, (task_id,)) row = cursor.fetchone() data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=row["progress"] ) # Execute update update_values.append(task_id) cursor.execute(f""" UPDATE tasks SET {', '.join(update_fields)} WHERE id = ? """, update_values) # Fetch and return updated task cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COALESCE( CAST(COUNT(a.id) AS FLOAT) / NULLIF( (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 0 ), 0.0 ) as progress FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.id = ? GROUP BY t.id """, (task_id,)) row = cursor.fetchone() data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=row["progress"] ) @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_task(task_id: str): """ Delete a task and all associated annotations. Args: task_id: Task unique identifier Raises: HTTPException: 404 if task not found """ with get_db_connection() as conn: cursor = conn.cursor() # Check if task exists cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) # Delete task (cascade will delete annotations) cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) return None @router.get("/projects/{project_id}/tasks", response_model=List[TaskResponse]) async def get_project_tasks(project_id: str): """ Get all tasks for a specific project. Args: project_id: Project unique identifier Returns: List of tasks belonging to the project Raises: HTTPException: 404 if project not found """ with get_db_connection() as conn: cursor = conn.cursor() # Verify project exists cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{project_id}' not found" ) # Get all tasks for the project cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COALESCE( CAST(COUNT(a.id) AS FLOAT) / NULLIF( (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 0 ), 0.0 ) as progress FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.project_id = ? GROUP BY t.id ORDER BY t.created_at DESC """, (project_id,)) rows = cursor.fetchall() tasks = [] for row in rows: # Parse JSON data data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] tasks.append(TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=row["progress"] )) return tasks