| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- """
- Annotation API router.
- Provides CRUD endpoints for annotation management.
- """
- import uuid
- import json
- from typing import List, Optional
- from fastapi import APIRouter, HTTPException, status, Query
- from database import get_db_connection
- from schemas.annotation import AnnotationCreate, AnnotationUpdate, AnnotationResponse
- from models import Annotation
- router = APIRouter(
- prefix="/api/annotations",
- tags=["annotations"]
- )
- @router.get("", response_model=List[AnnotationResponse])
- async def list_annotations(
- task_id: Optional[str] = Query(None, description="Filter by task ID"),
- user_id: Optional[str] = Query(None, description="Filter by user ID")
- ):
- """
- List all annotations with optional filters.
-
- Args:
- task_id: Optional task ID filter
- user_id: Optional user ID filter
-
- Returns:
- List of annotations matching the filters
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Build query with filters
- query = """
- SELECT
- id,
- task_id,
- user_id,
- result,
- created_at,
- updated_at
- FROM annotations
- WHERE 1=1
- """
- params = []
-
- if task_id:
- query += " AND task_id = ?"
- params.append(task_id)
-
- if user_id:
- query += " AND user_id = ?"
- params.append(user_id)
-
- query += " ORDER BY created_at DESC"
-
- cursor.execute(query, params)
- rows = cursor.fetchall()
-
- annotations = []
- for row in rows:
- # Parse JSON result
- result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
-
- annotations.append(AnnotationResponse(
- id=row["id"],
- task_id=row["task_id"],
- user_id=row["user_id"],
- result=result,
- created_at=row["created_at"],
- updated_at=row["updated_at"]
- ))
-
- return annotations
- @router.post("", response_model=AnnotationResponse, status_code=status.HTTP_201_CREATED)
- async def create_annotation(annotation: AnnotationCreate):
- """
- Create a new annotation.
-
- Args:
- annotation: Annotation creation data
-
- Returns:
- Created annotation with generated ID
-
- Raises:
- HTTPException: 404 if task not found
- """
- # Generate unique ID
- annotation_id = f"ann_{uuid.uuid4().hex[:12]}"
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Verify task exists
- cursor.execute("SELECT id FROM tasks WHERE id = ?", (annotation.task_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Task with id '{annotation.task_id}' not found"
- )
-
- # Serialize result to JSON
- result_json = json.dumps(annotation.result)
-
- # Insert new annotation
- cursor.execute("""
- INSERT INTO annotations (id, task_id, user_id, result)
- VALUES (?, ?, ?, ?)
- """, (
- annotation_id,
- annotation.task_id,
- annotation.user_id,
- result_json
- ))
-
- # Fetch the created annotation
- cursor.execute("""
- SELECT id, task_id, user_id, result, created_at, updated_at
- FROM annotations
- WHERE id = ?
- """, (annotation_id,))
-
- row = cursor.fetchone()
-
- # Parse JSON result
- result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
-
- return AnnotationResponse(
- id=row["id"],
- task_id=row["task_id"],
- user_id=row["user_id"],
- result=result,
- created_at=row["created_at"],
- updated_at=row["updated_at"]
- )
- @router.get("/{annotation_id}", response_model=AnnotationResponse)
- async def get_annotation(annotation_id: str):
- """
- Get annotation by ID.
-
- Args:
- annotation_id: Annotation unique identifier
-
- Returns:
- Annotation details
-
- Raises:
- HTTPException: 404 if annotation not found
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Get annotation
- cursor.execute("""
- SELECT id, task_id, user_id, result, created_at, updated_at
- FROM annotations
- WHERE id = ?
- """, (annotation_id,))
-
- row = cursor.fetchone()
-
- if not row:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Annotation with id '{annotation_id}' not found"
- )
-
- # Parse JSON result
- result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
-
- return AnnotationResponse(
- id=row["id"],
- task_id=row["task_id"],
- user_id=row["user_id"],
- result=result,
- created_at=row["created_at"],
- updated_at=row["updated_at"]
- )
- @router.put("/{annotation_id}", response_model=AnnotationResponse)
- async def update_annotation(annotation_id: str, annotation: AnnotationUpdate):
- """
- Update an existing annotation.
-
- Args:
- annotation_id: Annotation unique identifier
- annotation: Annotation update data
-
- Returns:
- Updated annotation details
-
- Raises:
- HTTPException: 404 if annotation not found
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Check if annotation exists
- cursor.execute("SELECT id FROM annotations WHERE id = ?", (annotation_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Annotation with id '{annotation_id}' not found"
- )
-
- # Serialize result to JSON
- result_json = json.dumps(annotation.result)
-
- # Update annotation
- cursor.execute("""
- UPDATE annotations
- SET result = ?, updated_at = CURRENT_TIMESTAMP
- WHERE id = ?
- """, (result_json, annotation_id))
-
- # Fetch and return updated annotation
- cursor.execute("""
- SELECT id, task_id, user_id, result, created_at, updated_at
- FROM annotations
- WHERE id = ?
- """, (annotation_id,))
-
- row = cursor.fetchone()
-
- # Parse JSON result
- result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
-
- return AnnotationResponse(
- id=row["id"],
- task_id=row["task_id"],
- user_id=row["user_id"],
- result=result,
- created_at=row["created_at"],
- updated_at=row["updated_at"]
- )
- @router.get("/tasks/{task_id}/annotations", response_model=List[AnnotationResponse])
- async def get_task_annotations(task_id: str):
- """
- Get all annotations for a specific task.
-
- Args:
- task_id: Task unique identifier
-
- Returns:
- List of annotations belonging to the task
-
- Raises:
- HTTPException: 404 if task not found
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Verify task exists
- cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Task with id '{task_id}' not found"
- )
-
- # Get all annotations for the task
- cursor.execute("""
- SELECT id, task_id, user_id, result, created_at, updated_at
- FROM annotations
- WHERE task_id = ?
- ORDER BY created_at DESC
- """, (task_id,))
-
- rows = cursor.fetchall()
-
- annotations = []
- for row in rows:
- # Parse JSON result
- result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
-
- annotations.append(AnnotationResponse(
- id=row["id"],
- task_id=row["task_id"],
- user_id=row["user_id"],
- result=result,
- created_at=row["created_at"],
- updated_at=row["updated_at"]
- ))
-
- return annotations
|