annotation.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. """
  2. Annotation API router.
  3. Provides CRUD endpoints for annotation management.
  4. """
  5. import uuid
  6. import json
  7. from typing import List, Optional
  8. from fastapi import APIRouter, HTTPException, status, Query
  9. from database import get_db_connection
  10. from schemas.annotation import AnnotationCreate, AnnotationUpdate, AnnotationResponse
  11. from models import Annotation
  12. router = APIRouter(
  13. prefix="/api/annotations",
  14. tags=["annotations"]
  15. )
  16. @router.get("", response_model=List[AnnotationResponse])
  17. async def list_annotations(
  18. task_id: Optional[str] = Query(None, description="Filter by task ID"),
  19. user_id: Optional[str] = Query(None, description="Filter by user ID")
  20. ):
  21. """
  22. List all annotations with optional filters.
  23. Args:
  24. task_id: Optional task ID filter
  25. user_id: Optional user ID filter
  26. Returns:
  27. List of annotations matching the filters
  28. """
  29. with get_db_connection() as conn:
  30. cursor = conn.cursor()
  31. # Build query with filters
  32. query = """
  33. SELECT
  34. id,
  35. task_id,
  36. user_id,
  37. result,
  38. created_at,
  39. updated_at
  40. FROM annotations
  41. WHERE 1=1
  42. """
  43. params = []
  44. if task_id:
  45. query += " AND task_id = ?"
  46. params.append(task_id)
  47. if user_id:
  48. query += " AND user_id = ?"
  49. params.append(user_id)
  50. query += " ORDER BY created_at DESC"
  51. cursor.execute(query, params)
  52. rows = cursor.fetchall()
  53. annotations = []
  54. for row in rows:
  55. # Parse JSON result
  56. result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
  57. annotations.append(AnnotationResponse(
  58. id=row["id"],
  59. task_id=row["task_id"],
  60. user_id=row["user_id"],
  61. result=result,
  62. created_at=row["created_at"],
  63. updated_at=row["updated_at"]
  64. ))
  65. return annotations
  66. @router.post("", response_model=AnnotationResponse, status_code=status.HTTP_201_CREATED)
  67. async def create_annotation(annotation: AnnotationCreate):
  68. """
  69. Create a new annotation.
  70. Args:
  71. annotation: Annotation creation data
  72. Returns:
  73. Created annotation with generated ID
  74. Raises:
  75. HTTPException: 404 if task not found
  76. """
  77. # Generate unique ID
  78. annotation_id = f"ann_{uuid.uuid4().hex[:12]}"
  79. with get_db_connection() as conn:
  80. cursor = conn.cursor()
  81. # Verify task exists
  82. cursor.execute("SELECT id FROM tasks WHERE id = ?", (annotation.task_id,))
  83. if not cursor.fetchone():
  84. raise HTTPException(
  85. status_code=status.HTTP_404_NOT_FOUND,
  86. detail=f"Task with id '{annotation.task_id}' not found"
  87. )
  88. # Serialize result to JSON
  89. result_json = json.dumps(annotation.result)
  90. # Insert new annotation
  91. cursor.execute("""
  92. INSERT INTO annotations (id, task_id, user_id, result)
  93. VALUES (?, ?, ?, ?)
  94. """, (
  95. annotation_id,
  96. annotation.task_id,
  97. annotation.user_id,
  98. result_json
  99. ))
  100. # Fetch the created annotation
  101. cursor.execute("""
  102. SELECT id, task_id, user_id, result, created_at, updated_at
  103. FROM annotations
  104. WHERE id = ?
  105. """, (annotation_id,))
  106. row = cursor.fetchone()
  107. # Parse JSON result
  108. result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
  109. return AnnotationResponse(
  110. id=row["id"],
  111. task_id=row["task_id"],
  112. user_id=row["user_id"],
  113. result=result,
  114. created_at=row["created_at"],
  115. updated_at=row["updated_at"]
  116. )
  117. @router.get("/{annotation_id}", response_model=AnnotationResponse)
  118. async def get_annotation(annotation_id: str):
  119. """
  120. Get annotation by ID.
  121. Args:
  122. annotation_id: Annotation unique identifier
  123. Returns:
  124. Annotation details
  125. Raises:
  126. HTTPException: 404 if annotation not found
  127. """
  128. with get_db_connection() as conn:
  129. cursor = conn.cursor()
  130. # Get annotation
  131. cursor.execute("""
  132. SELECT id, task_id, user_id, result, created_at, updated_at
  133. FROM annotations
  134. WHERE id = ?
  135. """, (annotation_id,))
  136. row = cursor.fetchone()
  137. if not row:
  138. raise HTTPException(
  139. status_code=status.HTTP_404_NOT_FOUND,
  140. detail=f"Annotation with id '{annotation_id}' not found"
  141. )
  142. # Parse JSON result
  143. result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
  144. return AnnotationResponse(
  145. id=row["id"],
  146. task_id=row["task_id"],
  147. user_id=row["user_id"],
  148. result=result,
  149. created_at=row["created_at"],
  150. updated_at=row["updated_at"]
  151. )
  152. @router.put("/{annotation_id}", response_model=AnnotationResponse)
  153. async def update_annotation(annotation_id: str, annotation: AnnotationUpdate):
  154. """
  155. Update an existing annotation.
  156. Args:
  157. annotation_id: Annotation unique identifier
  158. annotation: Annotation update data
  159. Returns:
  160. Updated annotation details
  161. Raises:
  162. HTTPException: 404 if annotation not found
  163. """
  164. with get_db_connection() as conn:
  165. cursor = conn.cursor()
  166. # Check if annotation exists
  167. cursor.execute("SELECT id FROM annotations WHERE id = ?", (annotation_id,))
  168. if not cursor.fetchone():
  169. raise HTTPException(
  170. status_code=status.HTTP_404_NOT_FOUND,
  171. detail=f"Annotation with id '{annotation_id}' not found"
  172. )
  173. # Serialize result to JSON
  174. result_json = json.dumps(annotation.result)
  175. # Update annotation
  176. cursor.execute("""
  177. UPDATE annotations
  178. SET result = ?, updated_at = CURRENT_TIMESTAMP
  179. WHERE id = ?
  180. """, (result_json, annotation_id))
  181. # Fetch and return updated annotation
  182. cursor.execute("""
  183. SELECT id, task_id, user_id, result, created_at, updated_at
  184. FROM annotations
  185. WHERE id = ?
  186. """, (annotation_id,))
  187. row = cursor.fetchone()
  188. # Parse JSON result
  189. result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
  190. return AnnotationResponse(
  191. id=row["id"],
  192. task_id=row["task_id"],
  193. user_id=row["user_id"],
  194. result=result,
  195. created_at=row["created_at"],
  196. updated_at=row["updated_at"]
  197. )
  198. @router.get("/tasks/{task_id}/annotations", response_model=List[AnnotationResponse])
  199. async def get_task_annotations(task_id: str):
  200. """
  201. Get all annotations for a specific task.
  202. Args:
  203. task_id: Task unique identifier
  204. Returns:
  205. List of annotations belonging to the task
  206. Raises:
  207. HTTPException: 404 if task not found
  208. """
  209. with get_db_connection() as conn:
  210. cursor = conn.cursor()
  211. # Verify task exists
  212. cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,))
  213. if not cursor.fetchone():
  214. raise HTTPException(
  215. status_code=status.HTTP_404_NOT_FOUND,
  216. detail=f"Task with id '{task_id}' not found"
  217. )
  218. # Get all annotations for the task
  219. cursor.execute("""
  220. SELECT id, task_id, user_id, result, created_at, updated_at
  221. FROM annotations
  222. WHERE task_id = ?
  223. ORDER BY created_at DESC
  224. """, (task_id,))
  225. rows = cursor.fetchall()
  226. annotations = []
  227. for row in rows:
  228. # Parse JSON result
  229. result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
  230. annotations.append(AnnotationResponse(
  231. id=row["id"],
  232. task_id=row["task_id"],
  233. user_id=row["user_id"],
  234. result=result,
  235. created_at=row["created_at"],
  236. updated_at=row["updated_at"]
  237. ))
  238. return annotations