""" Annotation API router. Provides CRUD endpoints for annotation management. """ import uuid import json from typing import List, Optional from fastapi import APIRouter, HTTPException, status, Query from database import get_db_connection from schemas.annotation import AnnotationCreate, AnnotationUpdate, AnnotationResponse from models import Annotation router = APIRouter( prefix="/api/annotations", tags=["annotations"] ) @router.get("", response_model=List[AnnotationResponse]) async def list_annotations( task_id: Optional[str] = Query(None, description="Filter by task ID"), user_id: Optional[str] = Query(None, description="Filter by user ID") ): """ List all annotations with optional filters. Args: task_id: Optional task ID filter user_id: Optional user ID filter Returns: List of annotations matching the filters """ with get_db_connection() as conn: cursor = conn.cursor() # Build query with filters query = """ SELECT id, task_id, user_id, result, created_at, updated_at FROM annotations WHERE 1=1 """ params = [] if task_id: query += " AND task_id = ?" params.append(task_id) if user_id: query += " AND user_id = ?" params.append(user_id) query += " ORDER BY created_at DESC" cursor.execute(query, params) rows = cursor.fetchall() annotations = [] for row in rows: # Parse JSON result result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"] annotations.append(AnnotationResponse( id=row["id"], task_id=row["task_id"], user_id=row["user_id"], result=result, created_at=row["created_at"], updated_at=row["updated_at"] )) return annotations @router.post("", response_model=AnnotationResponse, status_code=status.HTTP_201_CREATED) async def create_annotation(annotation: AnnotationCreate): """ Create a new annotation. Args: annotation: Annotation creation data Returns: Created annotation with generated ID Raises: HTTPException: 404 if task not found """ # Generate unique ID annotation_id = f"ann_{uuid.uuid4().hex[:12]}" with get_db_connection() as conn: cursor = conn.cursor() # Verify task exists cursor.execute("SELECT id FROM tasks WHERE id = ?", (annotation.task_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{annotation.task_id}' not found" ) # Serialize result to JSON result_json = json.dumps(annotation.result) # Insert new annotation cursor.execute(""" INSERT INTO annotations (id, task_id, user_id, result) VALUES (?, ?, ?, ?) """, ( annotation_id, annotation.task_id, annotation.user_id, result_json )) # Fetch the created annotation cursor.execute(""" SELECT id, task_id, user_id, result, created_at, updated_at FROM annotations WHERE id = ? """, (annotation_id,)) row = cursor.fetchone() # Parse JSON result result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"] return AnnotationResponse( id=row["id"], task_id=row["task_id"], user_id=row["user_id"], result=result, created_at=row["created_at"], updated_at=row["updated_at"] ) @router.get("/{annotation_id}", response_model=AnnotationResponse) async def get_annotation(annotation_id: str): """ Get annotation by ID. Args: annotation_id: Annotation unique identifier Returns: Annotation details Raises: HTTPException: 404 if annotation not found """ with get_db_connection() as conn: cursor = conn.cursor() # Get annotation cursor.execute(""" SELECT id, task_id, user_id, result, created_at, updated_at FROM annotations WHERE id = ? """, (annotation_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Annotation with id '{annotation_id}' not found" ) # Parse JSON result result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"] return AnnotationResponse( id=row["id"], task_id=row["task_id"], user_id=row["user_id"], result=result, created_at=row["created_at"], updated_at=row["updated_at"] ) @router.put("/{annotation_id}", response_model=AnnotationResponse) async def update_annotation(annotation_id: str, annotation: AnnotationUpdate): """ Update an existing annotation. Args: annotation_id: Annotation unique identifier annotation: Annotation update data Returns: Updated annotation details Raises: HTTPException: 404 if annotation not found """ with get_db_connection() as conn: cursor = conn.cursor() # Check if annotation exists cursor.execute("SELECT id FROM annotations WHERE id = ?", (annotation_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Annotation with id '{annotation_id}' not found" ) # Serialize result to JSON result_json = json.dumps(annotation.result) # Update annotation cursor.execute(""" UPDATE annotations SET result = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (result_json, annotation_id)) # Fetch and return updated annotation cursor.execute(""" SELECT id, task_id, user_id, result, created_at, updated_at FROM annotations WHERE id = ? """, (annotation_id,)) row = cursor.fetchone() # Parse JSON result result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"] return AnnotationResponse( id=row["id"], task_id=row["task_id"], user_id=row["user_id"], result=result, created_at=row["created_at"], updated_at=row["updated_at"] ) @router.get("/tasks/{task_id}/annotations", response_model=List[AnnotationResponse]) async def get_task_annotations(task_id: str): """ Get all annotations for a specific task. Args: task_id: Task unique identifier Returns: List of annotations belonging to the task Raises: HTTPException: 404 if task not found """ with get_db_connection() as conn: cursor = conn.cursor() # Verify task exists cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) # Get all annotations for the task cursor.execute(""" SELECT id, task_id, user_id, result, created_at, updated_at FROM annotations WHERE task_id = ? ORDER BY created_at DESC """, (task_id,)) rows = cursor.fetchall() annotations = [] for row in rows: # Parse JSON result result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"] annotations.append(AnnotationResponse( id=row["id"], task_id=row["task_id"], user_id=row["user_id"], result=result, created_at=row["created_at"], updated_at=row["updated_at"] )) return annotations