Browse Source

-dev:完成task和project相关接口

LuoChinWen 1 month ago
parent
commit
3408bea816

+ 2 - 0
.gitignore

@@ -18,6 +18,8 @@ dist
 .settings/
 *.sublime-workspace
 
+__pycache__
+
 ./.kiro/specs/
 
 # IDE - VSCode

+ 6 - 5
.kiro/specs/annotation-platform/tasks.md

@@ -45,7 +45,8 @@
     - DELETE /api/projects/{id} (删除,级联删除任务)
     - _Requirements: 5.3, 1.3, 1.6, 1.7_
 
-  - [ ]* 3.3 编写 Property 1 的属性测试
+  - [x] 3.3 编写 Property 1 的属性测试
+
     - 在 backend/test/ 目录创建测试文件
     - **Property 1: Project creation adds to list**
     - **Validates: Requirements 1.3**
@@ -68,12 +69,12 @@
     - _Requirements: 6.4_
 
 - [ ] 4. 实现后端 Task API
-  - [ ] 4.1 创建 Task 数据模型和 Pydantic schemas
+  - [x] 4.1 创建 Task 数据模型和 Pydantic schemas
     - 定义 TaskCreate, TaskUpdate, TaskResponse schemas
     - 创建数据库模型
     - _Requirements: 6.2_
 
-  - [ ] 4.2 实现 Task CRUD 端点
+  - [x] 4.2 实现 Task CRUD 端点
     - GET /api/tasks (列表,支持筛选)
     - POST /api/tasks (创建)
     - GET /api/tasks/{id} (详情)
@@ -97,12 +98,12 @@
     - _Requirements: 2.7_
 
 - [ ] 5. 实现后端 Annotation API
-  - [ ] 5.1 创建 Annotation 数据模型和 Pydantic schemas
+  - [x] 5.1 创建 Annotation 数据模型和 Pydantic schemas
     - 定义 AnnotationCreate, AnnotationUpdate, AnnotationResponse schemas
     - 创建数据库模型
     - _Requirements: 6.3_
 
-  - [ ] 5.2 实现 Annotation CRUD 端点
+  - [x] 5.2 实现 Annotation CRUD 端点
     - GET /api/annotations (列表,支持筛选)
     - POST /api/annotations (创建)
     - GET /api/annotations/{id} (详情)

BIN
backend/.hypothesis/examples/b9fdcb6dab8ec4d4/ee8e92f5def9662a


BIN
backend/.hypothesis/examples/e5c4b08a12637005/742d19fa794f16c5


BIN
backend/.hypothesis/unicode_data/14.0.0/charmap.json.gz


BIN
backend/__pycache__/main.cpython-311.pyc


+ 3 - 1
backend/main.py

@@ -6,7 +6,7 @@ from fastapi import FastAPI
 from fastapi.middleware.cors import CORSMiddleware
 from contextlib import asynccontextmanager
 from database import init_database
-from routers import project
+from routers import project, task, annotation
 
 
 @asynccontextmanager
@@ -43,6 +43,8 @@ app.add_middleware(
 
 # Include routers
 app.include_router(project.router)
+app.include_router(task.router)
+app.include_router(annotation.router)
 
 
 @app.get("/")

+ 1 - 0
backend/requirements.txt

@@ -5,3 +5,4 @@ python-multipart==0.0.6
 pytest==7.4.3
 pytest-cov==4.1.0
 httpx==0.26.0
+hypothesis==6.92.1

+ 297 - 0
backend/routers/annotation.py

@@ -0,0 +1,297 @@
+"""
+Annotation API router.
+Provides CRUD endpoints for annotation management.
+"""
+import uuid
+import json
+from typing import List, Optional
+from fastapi import APIRouter, HTTPException, status, Query
+from database import get_db_connection
+from schemas.annotation import AnnotationCreate, AnnotationUpdate, AnnotationResponse
+from models import Annotation
+
+router = APIRouter(
+    prefix="/api/annotations",
+    tags=["annotations"]
+)
+
+
+@router.get("", response_model=List[AnnotationResponse])
+async def list_annotations(
+    task_id: Optional[str] = Query(None, description="Filter by task ID"),
+    user_id: Optional[str] = Query(None, description="Filter by user ID")
+):
+    """
+    List all annotations with optional filters.
+    
+    Args:
+        task_id: Optional task ID filter
+        user_id: Optional user ID filter
+    
+    Returns:
+        List of annotations matching the filters
+    """
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Build query with filters
+        query = """
+            SELECT 
+                id,
+                task_id,
+                user_id,
+                result,
+                created_at,
+                updated_at
+            FROM annotations
+            WHERE 1=1
+        """
+        params = []
+        
+        if task_id:
+            query += " AND task_id = ?"
+            params.append(task_id)
+        
+        if user_id:
+            query += " AND user_id = ?"
+            params.append(user_id)
+        
+        query += " ORDER BY created_at DESC"
+        
+        cursor.execute(query, params)
+        rows = cursor.fetchall()
+        
+        annotations = []
+        for row in rows:
+            # Parse JSON result
+            result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
+            
+            annotations.append(AnnotationResponse(
+                id=row["id"],
+                task_id=row["task_id"],
+                user_id=row["user_id"],
+                result=result,
+                created_at=row["created_at"],
+                updated_at=row["updated_at"]
+            ))
+        
+        return annotations
+
+
+@router.post("", response_model=AnnotationResponse, status_code=status.HTTP_201_CREATED)
+async def create_annotation(annotation: AnnotationCreate):
+    """
+    Create a new annotation.
+    
+    Args:
+        annotation: Annotation creation data
+    
+    Returns:
+        Created annotation with generated ID
+    
+    Raises:
+        HTTPException: 404 if task not found
+    """
+    # Generate unique ID
+    annotation_id = f"ann_{uuid.uuid4().hex[:12]}"
+    
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Verify task exists
+        cursor.execute("SELECT id FROM tasks WHERE id = ?", (annotation.task_id,))
+        if not cursor.fetchone():
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail=f"Task with id '{annotation.task_id}' not found"
+            )
+        
+        # Serialize result to JSON
+        result_json = json.dumps(annotation.result)
+        
+        # Insert new annotation
+        cursor.execute("""
+            INSERT INTO annotations (id, task_id, user_id, result)
+            VALUES (?, ?, ?, ?)
+        """, (
+            annotation_id,
+            annotation.task_id,
+            annotation.user_id,
+            result_json
+        ))
+        
+        # Fetch the created annotation
+        cursor.execute("""
+            SELECT id, task_id, user_id, result, created_at, updated_at
+            FROM annotations
+            WHERE id = ?
+        """, (annotation_id,))
+        
+        row = cursor.fetchone()
+        
+        # Parse JSON result
+        result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
+        
+        return AnnotationResponse(
+            id=row["id"],
+            task_id=row["task_id"],
+            user_id=row["user_id"],
+            result=result,
+            created_at=row["created_at"],
+            updated_at=row["updated_at"]
+        )
+
+
+@router.get("/{annotation_id}", response_model=AnnotationResponse)
+async def get_annotation(annotation_id: str):
+    """
+    Get annotation by ID.
+    
+    Args:
+        annotation_id: Annotation unique identifier
+    
+    Returns:
+        Annotation details
+    
+    Raises:
+        HTTPException: 404 if annotation not found
+    """
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Get annotation
+        cursor.execute("""
+            SELECT id, task_id, user_id, result, created_at, updated_at
+            FROM annotations
+            WHERE id = ?
+        """, (annotation_id,))
+        
+        row = cursor.fetchone()
+        
+        if not row:
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail=f"Annotation with id '{annotation_id}' not found"
+            )
+        
+        # Parse JSON result
+        result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
+        
+        return AnnotationResponse(
+            id=row["id"],
+            task_id=row["task_id"],
+            user_id=row["user_id"],
+            result=result,
+            created_at=row["created_at"],
+            updated_at=row["updated_at"]
+        )
+
+
+@router.put("/{annotation_id}", response_model=AnnotationResponse)
+async def update_annotation(annotation_id: str, annotation: AnnotationUpdate):
+    """
+    Update an existing annotation.
+    
+    Args:
+        annotation_id: Annotation unique identifier
+        annotation: Annotation update data
+    
+    Returns:
+        Updated annotation details
+    
+    Raises:
+        HTTPException: 404 if annotation not found
+    """
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Check if annotation exists
+        cursor.execute("SELECT id FROM annotations WHERE id = ?", (annotation_id,))
+        if not cursor.fetchone():
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail=f"Annotation with id '{annotation_id}' not found"
+            )
+        
+        # Serialize result to JSON
+        result_json = json.dumps(annotation.result)
+        
+        # Update annotation
+        cursor.execute("""
+            UPDATE annotations
+            SET result = ?, updated_at = CURRENT_TIMESTAMP
+            WHERE id = ?
+        """, (result_json, annotation_id))
+        
+        # Fetch and return updated annotation
+        cursor.execute("""
+            SELECT id, task_id, user_id, result, created_at, updated_at
+            FROM annotations
+            WHERE id = ?
+        """, (annotation_id,))
+        
+        row = cursor.fetchone()
+        
+        # Parse JSON result
+        result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
+        
+        return AnnotationResponse(
+            id=row["id"],
+            task_id=row["task_id"],
+            user_id=row["user_id"],
+            result=result,
+            created_at=row["created_at"],
+            updated_at=row["updated_at"]
+        )
+
+
+@router.get("/tasks/{task_id}/annotations", response_model=List[AnnotationResponse])
+async def get_task_annotations(task_id: str):
+    """
+    Get all annotations for a specific task.
+    
+    Args:
+        task_id: Task unique identifier
+    
+    Returns:
+        List of annotations belonging to the task
+    
+    Raises:
+        HTTPException: 404 if task not found
+    """
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Verify task exists
+        cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,))
+        if not cursor.fetchone():
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail=f"Task with id '{task_id}' not found"
+            )
+        
+        # Get all annotations for the task
+        cursor.execute("""
+            SELECT id, task_id, user_id, result, created_at, updated_at
+            FROM annotations
+            WHERE task_id = ?
+            ORDER BY created_at DESC
+        """, (task_id,))
+        
+        rows = cursor.fetchall()
+        
+        annotations = []
+        for row in rows:
+            # Parse JSON result
+            result = json.loads(row["result"]) if isinstance(row["result"], str) else row["result"]
+            
+            annotations.append(AnnotationResponse(
+                id=row["id"],
+                task_id=row["task_id"],
+                user_id=row["user_id"],
+                result=result,
+                created_at=row["created_at"],
+                updated_at=row["updated_at"]
+            ))
+        
+        return annotations

+ 450 - 0
backend/routers/task.py

@@ -0,0 +1,450 @@
+"""
+Task API router.
+Provides CRUD endpoints for task management.
+"""
+import uuid
+import json
+from typing import List, Optional
+from fastapi import APIRouter, HTTPException, status, Query
+from database import get_db_connection
+from schemas.task import TaskCreate, TaskUpdate, TaskResponse
+from models import Task
+
+router = APIRouter(
+    prefix="/api/tasks",
+    tags=["tasks"]
+)
+
+
+@router.get("", response_model=List[TaskResponse])
+async def list_tasks(
+    project_id: Optional[str] = Query(None, description="Filter by project ID"),
+    status_filter: Optional[str] = Query(None, alias="status", description="Filter by status"),
+    assigned_to: Optional[str] = Query(None, description="Filter by assigned user")
+):
+    """
+    List all tasks with optional filters.
+    
+    Args:
+        project_id: Optional project ID filter
+        status_filter: Optional status filter (pending, in_progress, completed)
+        assigned_to: Optional assigned user filter
+    
+    Returns:
+        List of tasks matching the filters
+    """
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Build query with filters
+        query = """
+            SELECT 
+                t.id,
+                t.project_id,
+                t.name,
+                t.data,
+                t.status,
+                t.assigned_to,
+                t.created_at,
+                COALESCE(
+                    CAST(COUNT(a.id) AS FLOAT) / NULLIF(
+                        (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 
+                        0
+                    ),
+                    0.0
+                ) as progress
+            FROM tasks t
+            LEFT JOIN annotations a ON t.id = a.task_id
+            WHERE 1=1
+        """
+        params = []
+        
+        if project_id:
+            query += " AND t.project_id = ?"
+            params.append(project_id)
+        
+        if status_filter:
+            query += " AND t.status = ?"
+            params.append(status_filter)
+        
+        if assigned_to:
+            query += " AND t.assigned_to = ?"
+            params.append(assigned_to)
+        
+        query += " GROUP BY t.id ORDER BY t.created_at DESC"
+        
+        cursor.execute(query, params)
+        rows = cursor.fetchall()
+        
+        tasks = []
+        for row in rows:
+            # Parse JSON data
+            data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
+            
+            tasks.append(TaskResponse(
+                id=row["id"],
+                project_id=row["project_id"],
+                name=row["name"],
+                data=data,
+                status=row["status"],
+                assigned_to=row["assigned_to"],
+                created_at=row["created_at"],
+                progress=row["progress"]
+            ))
+        
+        return tasks
+
+
+@router.post("", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
+async def create_task(task: TaskCreate):
+    """
+    Create a new task.
+    
+    Args:
+        task: Task creation data
+    
+    Returns:
+        Created task with generated ID
+    
+    Raises:
+        HTTPException: 404 if project not found
+    """
+    # Generate unique ID
+    task_id = f"task_{uuid.uuid4().hex[:12]}"
+    
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Verify project exists
+        cursor.execute("SELECT id FROM projects WHERE id = ?", (task.project_id,))
+        if not cursor.fetchone():
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail=f"Project with id '{task.project_id}' not found"
+            )
+        
+        # Serialize data to JSON
+        data_json = json.dumps(task.data)
+        
+        # Insert new task
+        cursor.execute("""
+            INSERT INTO tasks (id, project_id, name, data, status, assigned_to)
+            VALUES (?, ?, ?, ?, 'pending', ?)
+        """, (
+            task_id,
+            task.project_id,
+            task.name,
+            data_json,
+            task.assigned_to
+        ))
+        
+        # Fetch the created task
+        cursor.execute("""
+            SELECT id, project_id, name, data, status, assigned_to, created_at
+            FROM tasks
+            WHERE id = ?
+        """, (task_id,))
+        
+        row = cursor.fetchone()
+        
+        # Parse JSON data
+        data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
+        
+        return TaskResponse(
+            id=row["id"],
+            project_id=row["project_id"],
+            name=row["name"],
+            data=data,
+            status=row["status"],
+            assigned_to=row["assigned_to"],
+            created_at=row["created_at"],
+            progress=0.0
+        )
+
+
+@router.get("/{task_id}", response_model=TaskResponse)
+async def get_task(task_id: str):
+    """
+    Get task by ID.
+    
+    Args:
+        task_id: Task unique identifier
+    
+    Returns:
+        Task details with progress
+    
+    Raises:
+        HTTPException: 404 if task not found
+    """
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Get task with progress
+        cursor.execute("""
+            SELECT 
+                t.id,
+                t.project_id,
+                t.name,
+                t.data,
+                t.status,
+                t.assigned_to,
+                t.created_at,
+                COALESCE(
+                    CAST(COUNT(a.id) AS FLOAT) / NULLIF(
+                        (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 
+                        0
+                    ),
+                    0.0
+                ) as progress
+            FROM tasks t
+            LEFT JOIN annotations a ON t.id = a.task_id
+            WHERE t.id = ?
+            GROUP BY t.id
+        """, (task_id,))
+        
+        row = cursor.fetchone()
+        
+        if not row:
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail=f"Task with id '{task_id}' not found"
+            )
+        
+        # Parse JSON data
+        data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
+        
+        return TaskResponse(
+            id=row["id"],
+            project_id=row["project_id"],
+            name=row["name"],
+            data=data,
+            status=row["status"],
+            assigned_to=row["assigned_to"],
+            created_at=row["created_at"],
+            progress=row["progress"]
+        )
+
+
+@router.put("/{task_id}", response_model=TaskResponse)
+async def update_task(task_id: str, task: TaskUpdate):
+    """
+    Update an existing task.
+    
+    Args:
+        task_id: Task unique identifier
+        task: Task update data
+    
+    Returns:
+        Updated task details
+    
+    Raises:
+        HTTPException: 404 if task not found
+    """
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Check if task exists
+        cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,))
+        if not cursor.fetchone():
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail=f"Task with id '{task_id}' not found"
+            )
+        
+        # Build update query dynamically based on provided fields
+        update_fields = []
+        update_values = []
+        
+        if task.name is not None:
+            update_fields.append("name = ?")
+            update_values.append(task.name)
+        
+        if task.data is not None:
+            update_fields.append("data = ?")
+            update_values.append(json.dumps(task.data))
+        
+        if task.status is not None:
+            update_fields.append("status = ?")
+            update_values.append(task.status)
+        
+        if task.assigned_to is not None:
+            update_fields.append("assigned_to = ?")
+            update_values.append(task.assigned_to)
+        
+        if not update_fields:
+            # No fields to update, just return current task
+            cursor.execute("""
+                SELECT 
+                    t.id,
+                    t.project_id,
+                    t.name,
+                    t.data,
+                    t.status,
+                    t.assigned_to,
+                    t.created_at,
+                    COALESCE(
+                        CAST(COUNT(a.id) AS FLOAT) / NULLIF(
+                            (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 
+                            0
+                        ),
+                        0.0
+                    ) as progress
+                FROM tasks t
+                LEFT JOIN annotations a ON t.id = a.task_id
+                WHERE t.id = ?
+                GROUP BY t.id
+            """, (task_id,))
+            row = cursor.fetchone()
+            data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
+            return TaskResponse(
+                id=row["id"],
+                project_id=row["project_id"],
+                name=row["name"],
+                data=data,
+                status=row["status"],
+                assigned_to=row["assigned_to"],
+                created_at=row["created_at"],
+                progress=row["progress"]
+            )
+        
+        # Execute update
+        update_values.append(task_id)
+        cursor.execute(f"""
+            UPDATE tasks
+            SET {', '.join(update_fields)}
+            WHERE id = ?
+        """, update_values)
+        
+        # Fetch and return updated task
+        cursor.execute("""
+            SELECT 
+                t.id,
+                t.project_id,
+                t.name,
+                t.data,
+                t.status,
+                t.assigned_to,
+                t.created_at,
+                COALESCE(
+                    CAST(COUNT(a.id) AS FLOAT) / NULLIF(
+                        (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 
+                        0
+                    ),
+                    0.0
+                ) as progress
+            FROM tasks t
+            LEFT JOIN annotations a ON t.id = a.task_id
+            WHERE t.id = ?
+            GROUP BY t.id
+        """, (task_id,))
+        
+        row = cursor.fetchone()
+        data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
+        return TaskResponse(
+            id=row["id"],
+            project_id=row["project_id"],
+            name=row["name"],
+            data=data,
+            status=row["status"],
+            assigned_to=row["assigned_to"],
+            created_at=row["created_at"],
+            progress=row["progress"]
+        )
+
+
+@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
+async def delete_task(task_id: str):
+    """
+    Delete a task and all associated annotations.
+    
+    Args:
+        task_id: Task unique identifier
+    
+    Raises:
+        HTTPException: 404 if task not found
+    """
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Check if task exists
+        cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,))
+        if not cursor.fetchone():
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail=f"Task with id '{task_id}' not found"
+            )
+        
+        # Delete task (cascade will delete annotations)
+        cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
+        
+        return None
+
+
+@router.get("/projects/{project_id}/tasks", response_model=List[TaskResponse])
+async def get_project_tasks(project_id: str):
+    """
+    Get all tasks for a specific project.
+    
+    Args:
+        project_id: Project unique identifier
+    
+    Returns:
+        List of tasks belonging to the project
+    
+    Raises:
+        HTTPException: 404 if project not found
+    """
+    with get_db_connection() as conn:
+        cursor = conn.cursor()
+        
+        # Verify project exists
+        cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,))
+        if not cursor.fetchone():
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail=f"Project with id '{project_id}' not found"
+            )
+        
+        # Get all tasks for the project
+        cursor.execute("""
+            SELECT 
+                t.id,
+                t.project_id,
+                t.name,
+                t.data,
+                t.status,
+                t.assigned_to,
+                t.created_at,
+                COALESCE(
+                    CAST(COUNT(a.id) AS FLOAT) / NULLIF(
+                        (SELECT COUNT(*) FROM json_each(t.data, '$.items')), 
+                        0
+                    ),
+                    0.0
+                ) as progress
+            FROM tasks t
+            LEFT JOIN annotations a ON t.id = a.task_id
+            WHERE t.project_id = ?
+            GROUP BY t.id
+            ORDER BY t.created_at DESC
+        """, (project_id,))
+        
+        rows = cursor.fetchall()
+        
+        tasks = []
+        for row in rows:
+            # Parse JSON data
+            data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
+            
+            tasks.append(TaskResponse(
+                id=row["id"],
+                project_id=row["project_id"],
+                name=row["name"],
+                data=data,
+                status=row["status"],
+                assigned_to=row["assigned_to"],
+                created_at=row["created_at"],
+                progress=row["progress"]
+            ))
+        
+        return tasks

+ 8 - 0
backend/schemas/__init__.py

@@ -2,9 +2,17 @@
 Pydantic schemas package.
 """
 from .project import ProjectCreate, ProjectUpdate, ProjectResponse
+from .task import TaskCreate, TaskUpdate, TaskResponse
+from .annotation import AnnotationCreate, AnnotationUpdate, AnnotationResponse
 
 __all__ = [
     "ProjectCreate",
     "ProjectUpdate",
     "ProjectResponse",
+    "TaskCreate",
+    "TaskUpdate",
+    "TaskResponse",
+    "AnnotationCreate",
+    "AnnotationUpdate",
+    "AnnotationResponse",
 ]

BIN
backend/schemas/__pycache__/__init__.cpython-311.pyc


+ 102 - 0
backend/schemas/annotation.py

@@ -0,0 +1,102 @@
+"""
+Pydantic schemas for Annotation API.
+Defines request and response models for annotation operations.
+"""
+from datetime import datetime
+from typing import Any
+from pydantic import BaseModel, Field
+
+
+class AnnotationCreate(BaseModel):
+    """Schema for creating a new annotation."""
+    
+    task_id: str = Field(..., min_length=1, description="Task ID this annotation belongs to")
+    user_id: str = Field(..., min_length=1, description="User ID who created this annotation")
+    result: dict = Field(..., description="Annotation result data (JSON object)")
+    
+    class Config:
+        json_schema_extra = {
+            "example": {
+                "task_id": "task_456def",
+                "user_id": "user_001",
+                "result": {
+                    "annotations": [
+                        {
+                            "id": "ann_1",
+                            "type": "rectanglelabels",
+                            "value": {
+                                "x": 10,
+                                "y": 20,
+                                "width": 100,
+                                "height": 50,
+                                "rectanglelabels": ["Cat"]
+                            }
+                        }
+                    ]
+                }
+            }
+        }
+
+
+class AnnotationUpdate(BaseModel):
+    """Schema for updating an existing annotation."""
+    
+    result: dict = Field(..., description="Updated annotation result data (JSON object)")
+    
+    class Config:
+        json_schema_extra = {
+            "example": {
+                "result": {
+                    "annotations": [
+                        {
+                            "id": "ann_1",
+                            "type": "rectanglelabels",
+                            "value": {
+                                "x": 15,
+                                "y": 25,
+                                "width": 110,
+                                "height": 55,
+                                "rectanglelabels": ["Dog"]
+                            }
+                        }
+                    ]
+                }
+            }
+        }
+
+
+class AnnotationResponse(BaseModel):
+    """Schema for annotation response."""
+    
+    id: str = Field(..., description="Annotation unique identifier")
+    task_id: str = Field(..., description="Task ID this annotation belongs to")
+    user_id: str = Field(..., description="User ID who created this annotation")
+    result: dict = Field(..., description="Annotation result data (JSON object)")
+    created_at: datetime = Field(..., description="Annotation creation timestamp")
+    updated_at: datetime = Field(..., description="Annotation last update timestamp")
+    
+    class Config:
+        json_schema_extra = {
+            "example": {
+                "id": "ann_789ghi",
+                "task_id": "task_456def",
+                "user_id": "user_001",
+                "result": {
+                    "annotations": [
+                        {
+                            "id": "ann_1",
+                            "type": "rectanglelabels",
+                            "value": {
+                                "x": 10,
+                                "y": 20,
+                                "width": 100,
+                                "height": 50,
+                                "rectanglelabels": ["Cat"]
+                            }
+                        }
+                    ]
+                },
+                "created_at": "2024-01-12T12:00:00",
+                "updated_at": "2024-01-12T12:30:00"
+            }
+        }

+ 77 - 0
backend/schemas/task.py

@@ -0,0 +1,77 @@
+"""
+Pydantic schemas for Task API.
+Defines request and response models for task operations.
+"""
+from datetime import datetime
+from typing import Optional, Any
+from pydantic import BaseModel, Field
+
+
+class TaskCreate(BaseModel):
+    """Schema for creating a new task."""
+    
+    project_id: str = Field(..., min_length=1, description="Project ID this task belongs to")
+    name: str = Field(..., min_length=1, description="Task name")
+    data: dict = Field(..., description="Task data (JSON object)")
+    assigned_to: Optional[str] = Field(None, description="User ID assigned to this task")
+    
+    class Config:
+        json_schema_extra = {
+            "example": {
+                "project_id": "proj_123abc",
+                "name": "Annotate Image Batch 1",
+                "data": {
+                    "image_url": "https://example.com/image1.jpg",
+                    "metadata": {"batch": 1}
+                },
+                "assigned_to": "user_001"
+            }
+        }
+
+
+class TaskUpdate(BaseModel):
+    """Schema for updating an existing task."""
+    
+    name: Optional[str] = Field(None, min_length=1, description="Task name")
+    data: Optional[dict] = Field(None, description="Task data (JSON object)")
+    status: Optional[str] = Field(None, description="Task status (pending, in_progress, completed)")
+    assigned_to: Optional[str] = Field(None, description="User ID assigned to this task")
+    
+    class Config:
+        json_schema_extra = {
+            "example": {
+                "name": "Updated Task Name",
+                "status": "in_progress",
+                "assigned_to": "user_002"
+            }
+        }
+
+
+class TaskResponse(BaseModel):
+    """Schema for task response."""
+    
+    id: str = Field(..., description="Task unique identifier")
+    project_id: str = Field(..., description="Project ID this task belongs to")
+    name: str = Field(..., description="Task name")
+    data: dict = Field(..., description="Task data (JSON object)")
+    status: str = Field(..., description="Task status (pending, in_progress, completed)")
+    assigned_to: Optional[str] = Field(None, description="User ID assigned to this task")
+    created_at: datetime = Field(..., description="Task creation timestamp")
+    progress: float = Field(default=0.0, description="Task completion progress (0.0 to 1.0)")
+    
+    class Config:
+        json_schema_extra = {
+            "example": {
+                "id": "task_456def",
+                "project_id": "proj_123abc",
+                "name": "Annotate Image Batch 1",
+                "data": {
+                    "image_url": "https://example.com/image1.jpg",
+                    "metadata": {"batch": 1}
+                },
+                "status": "in_progress",
+                "assigned_to": "user_001",
+                "created_at": "2024-01-12T11:00:00",
+                "progress": 0.5
+            }
+        }

+ 352 - 0
backend/test/test_annotation_api.py

@@ -0,0 +1,352 @@
+"""
+Unit tests for Annotation API endpoints.
+Tests CRUD operations for annotations.
+"""
+import pytest
+import os
+import json
+from fastapi.testclient import TestClient
+
+# Use a test database
+TEST_DB_PATH = "test_annotation_annotation_platform.db"
+
+
+@pytest.fixture(scope="function", autouse=True)
+def setup_test_db():
+    """Setup test database before each test and cleanup after."""
+    # Set test database path
+    original_db_path = os.environ.get("DATABASE_PATH")
+    os.environ["DATABASE_PATH"] = TEST_DB_PATH
+    
+    # Remove existing test database
+    if os.path.exists(TEST_DB_PATH):
+        os.remove(TEST_DB_PATH)
+    
+    # Import after setting env var
+    from database import init_database
+    init_database()
+    
+    yield
+    
+    # Cleanup
+    if os.path.exists(TEST_DB_PATH):
+        os.remove(TEST_DB_PATH)
+    
+    # Restore original path
+    if original_db_path:
+        os.environ["DATABASE_PATH"] = original_db_path
+    elif "DATABASE_PATH" in os.environ:
+        del os.environ["DATABASE_PATH"]
+
+
+@pytest.fixture(scope="function")
+def test_client():
+    """Create a test client."""
+    from main import app
+    return TestClient(app)
+
+
+@pytest.fixture(scope="function")
+def sample_project(test_client):
+    """Create a sample project for testing."""
+    project_data = {
+        "name": "Test Project",
+        "description": "Test Description",
+        "config": "<View><Image name='img' value='$image'/></View>"
+    }
+    response = test_client.post("/api/projects", json=project_data)
+    return response.json()
+
+
+@pytest.fixture(scope="function")
+def sample_task(test_client, sample_project):
+    """Create a sample task for testing."""
+    task_data = {
+        "project_id": sample_project["id"],
+        "name": "Test Task",
+        "data": {"image_url": "https://example.com/image.jpg"},
+        "assigned_to": "user_001"
+    }
+    response = test_client.post("/api/tasks", json=task_data)
+    return response.json()
+
+
+def test_list_annotations_empty(test_client):
+    """Test listing annotations when database is empty."""
+    response = test_client.get("/api/annotations")
+    assert response.status_code == 200
+    assert response.json() == []
+
+
+def test_create_annotation(test_client, sample_task):
+    """Test creating a new annotation."""
+    annotation_data = {
+        "task_id": sample_task["id"],
+        "user_id": "user_001",
+        "result": {
+            "annotations": [
+                {
+                    "id": "ann_1",
+                    "type": "rectanglelabels",
+                    "value": {
+                        "x": 10,
+                        "y": 20,
+                        "width": 100,
+                        "height": 50,
+                        "rectanglelabels": ["Cat"]
+                    }
+                }
+            ]
+        }
+    }
+    
+    response = test_client.post("/api/annotations", json=annotation_data)
+    assert response.status_code == 201
+    
+    data = response.json()
+    assert data["task_id"] == annotation_data["task_id"]
+    assert data["user_id"] == annotation_data["user_id"]
+    assert data["result"] == annotation_data["result"]
+    assert "id" in data
+    assert data["id"].startswith("ann_")
+    assert "created_at" in data
+    assert "updated_at" in data
+
+
+def test_create_annotation_invalid_task(test_client):
+    """Test creating an annotation with invalid task_id fails."""
+    annotation_data = {
+        "task_id": "nonexistent_task",
+        "user_id": "user_001",
+        "result": {"annotations": []}
+    }
+    
+    response = test_client.post("/api/annotations", json=annotation_data)
+    assert response.status_code == 404
+    assert "not found" in response.json()["detail"].lower()
+
+
+def test_get_annotation(test_client, sample_task):
+    """Test getting an annotation by ID."""
+    # Create an annotation first
+    annotation_data = {
+        "task_id": sample_task["id"],
+        "user_id": "user_001",
+        "result": {"annotations": [{"id": "ann_1", "value": "test"}]}
+    }
+    create_response = test_client.post("/api/annotations", json=annotation_data)
+    annotation_id = create_response.json()["id"]
+    
+    # Get the annotation
+    response = test_client.get(f"/api/annotations/{annotation_id}")
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert data["id"] == annotation_id
+    assert data["task_id"] == annotation_data["task_id"]
+    assert data["user_id"] == annotation_data["user_id"]
+
+
+def test_get_annotation_not_found(test_client):
+    """Test getting a non-existent annotation returns 404."""
+    response = test_client.get("/api/annotations/nonexistent_id")
+    assert response.status_code == 404
+    assert "not found" in response.json()["detail"].lower()
+
+
+def test_update_annotation(test_client, sample_task):
+    """Test updating an annotation."""
+    # Create an annotation first
+    annotation_data = {
+        "task_id": sample_task["id"],
+        "user_id": "user_001",
+        "result": {"annotations": [{"id": "ann_1", "value": "original"}]}
+    }
+    create_response = test_client.post("/api/annotations", json=annotation_data)
+    annotation_id = create_response.json()["id"]
+    
+    # Update the annotation
+    update_data = {
+        "result": {"annotations": [{"id": "ann_1", "value": "updated"}]}
+    }
+    response = test_client.put(f"/api/annotations/{annotation_id}", json=update_data)
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert data["result"] == update_data["result"]
+    assert data["task_id"] == annotation_data["task_id"]  # Task ID unchanged
+    assert data["user_id"] == annotation_data["user_id"]  # User ID unchanged
+
+
+def test_update_annotation_not_found(test_client):
+    """Test updating a non-existent annotation returns 404."""
+    update_data = {"result": {"annotations": []}}
+    response = test_client.put("/api/annotations/nonexistent_id", json=update_data)
+    assert response.status_code == 404
+
+
+def test_list_annotations_after_creation(test_client, sample_task):
+    """Test listing annotations after creating some."""
+    # Create multiple annotations
+    for i in range(3):
+        annotation_data = {
+            "task_id": sample_task["id"],
+            "user_id": f"user_{i:03d}",
+            "result": {"annotations": [{"id": f"ann_{i}", "value": f"test_{i}"}]}
+        }
+        test_client.post("/api/annotations", json=annotation_data)
+    
+    # List annotations
+    response = test_client.get("/api/annotations")
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert len(data) == 3
+    assert all("id" in annotation for annotation in data)
+    assert all("created_at" in annotation for annotation in data)
+    assert all("updated_at" in annotation for annotation in data)
+
+
+def test_list_annotations_filter_by_task(test_client, sample_project):
+    """Test filtering annotations by task_id."""
+    # Create two tasks
+    task1_data = {
+        "project_id": sample_project["id"],
+        "name": "Task 1",
+        "data": {"image_url": "https://example.com/image1.jpg"}
+    }
+    task1_response = test_client.post("/api/tasks", json=task1_data)
+    task1 = task1_response.json()
+    
+    task2_data = {
+        "project_id": sample_project["id"],
+        "name": "Task 2",
+        "data": {"image_url": "https://example.com/image2.jpg"}
+    }
+    task2_response = test_client.post("/api/tasks", json=task2_data)
+    task2 = task2_response.json()
+    
+    # Create annotations for both tasks
+    ann1_data = {
+        "task_id": task1["id"],
+        "user_id": "user_001",
+        "result": {"annotations": [{"id": "ann_1"}]}
+    }
+    test_client.post("/api/annotations", json=ann1_data)
+    
+    ann2_data = {
+        "task_id": task2["id"],
+        "user_id": "user_001",
+        "result": {"annotations": [{"id": "ann_2"}]}
+    }
+    test_client.post("/api/annotations", json=ann2_data)
+    
+    # Filter by first task
+    response = test_client.get(f"/api/annotations?task_id={task1['id']}")
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert len(data) == 1
+    assert data[0]["task_id"] == task1["id"]
+
+
+def test_list_annotations_filter_by_user(test_client, sample_task):
+    """Test filtering annotations by user_id."""
+    # Create annotations for different users
+    ann1_data = {
+        "task_id": sample_task["id"],
+        "user_id": "user_001",
+        "result": {"annotations": [{"id": "ann_1"}]}
+    }
+    test_client.post("/api/annotations", json=ann1_data)
+    
+    ann2_data = {
+        "task_id": sample_task["id"],
+        "user_id": "user_002",
+        "result": {"annotations": [{"id": "ann_2"}]}
+    }
+    test_client.post("/api/annotations", json=ann2_data)
+    
+    # Filter by first user
+    response = test_client.get("/api/annotations?user_id=user_001")
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert len(data) == 1
+    assert data[0]["user_id"] == "user_001"
+
+
+def test_get_task_annotations(test_client, sample_task):
+    """Test getting all annotations for a specific task."""
+    # Create annotations for the task
+    for i in range(2):
+        annotation_data = {
+            "task_id": sample_task["id"],
+            "user_id": f"user_{i:03d}",
+            "result": {"annotations": [{"id": f"ann_{i}"}]}
+        }
+        test_client.post("/api/annotations", json=annotation_data)
+    
+    # Get task annotations using the alternative endpoint
+    response = test_client.get(f"/api/annotations/tasks/{sample_task['id']}/annotations")
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert len(data) == 2
+    assert all(annotation["task_id"] == sample_task["id"] for annotation in data)
+
+
+def test_get_task_annotations_not_found(test_client):
+    """Test getting annotations for non-existent task returns 404."""
+    response = test_client.get("/api/annotations/tasks/nonexistent_id/annotations")
+    assert response.status_code == 404
+
+
+def test_annotation_json_serialization(test_client, sample_task):
+    """Test that complex JSON data is properly serialized and deserialized."""
+    complex_result = {
+        "annotations": [
+            {
+                "id": "ann_1",
+                "type": "rectanglelabels",
+                "value": {
+                    "x": 10.5,
+                    "y": 20.3,
+                    "width": 100,
+                    "height": 50,
+                    "rectanglelabels": ["Cat", "Animal"]
+                }
+            },
+            {
+                "id": "ann_2",
+                "type": "choices",
+                "value": {
+                    "choices": ["Option A"]
+                }
+            }
+        ],
+        "metadata": {
+            "duration": 120,
+            "quality": "high"
+        }
+    }
+    
+    annotation_data = {
+        "task_id": sample_task["id"],
+        "user_id": "user_001",
+        "result": complex_result
+    }
+    
+    # Create annotation
+    create_response = test_client.post("/api/annotations", json=annotation_data)
+    assert create_response.status_code == 201
+    annotation_id = create_response.json()["id"]
+    
+    # Get annotation and verify data integrity
+    get_response = test_client.get(f"/api/annotations/{annotation_id}")
+    assert get_response.status_code == 200
+    
+    data = get_response.json()
+    assert data["result"] == complex_result
+    assert data["result"]["annotations"][0]["value"]["x"] == 10.5
+    assert data["result"]["metadata"]["duration"] == 120

+ 258 - 0
backend/test/test_property_project_creation.py

@@ -0,0 +1,258 @@
+"""
+Property-based tests for Project creation.
+Tests universal properties that should hold for all valid project data.
+
+Feature: annotation-platform
+Property 1: Project creation adds to list
+Validates: Requirements 1.3
+"""
+import os
+import pytest
+from hypothesis import given, strategies as st, settings, HealthCheck
+from fastapi.testclient import TestClient
+
+# Use a test database
+TEST_DB_PATH = "test_property_annotation_platform.db"
+
+
+@pytest.fixture(scope="function", autouse=True)
+def setup_test_db():
+    """Setup test database before each test and cleanup after."""
+    # Set test database path
+    original_db_path = os.environ.get("DATABASE_PATH")
+    os.environ["DATABASE_PATH"] = TEST_DB_PATH
+    
+    # Remove existing test database
+    if os.path.exists(TEST_DB_PATH):
+        os.remove(TEST_DB_PATH)
+    
+    # Import after setting env var
+    from database import init_database
+    init_database()
+    
+    yield
+    
+    # Cleanup
+    if os.path.exists(TEST_DB_PATH):
+        os.remove(TEST_DB_PATH)
+    
+    # Restore original path
+    if original_db_path:
+        os.environ["DATABASE_PATH"] = original_db_path
+    elif "DATABASE_PATH" in os.environ:
+        del os.environ["DATABASE_PATH"]
+
+
+def get_test_client():
+    """Create a test client."""
+    from main import app
+    return TestClient(app)
+
+
+# Strategy for generating valid project names (non-empty strings)
+valid_project_names = st.text(
+    alphabet=st.characters(
+        whitelist_categories=("Lu", "Ll", "Nd", "P", "Zs"),
+        min_codepoint=32,
+        max_codepoint=126
+    ),
+    min_size=1,
+    max_size=100
+).filter(lambda x: x.strip() != "")  # Ensure not just whitespace
+
+# Strategy for generating project descriptions
+project_descriptions = st.text(
+    alphabet=st.characters(
+        whitelist_categories=("Lu", "Ll", "Nd", "P", "Zs"),
+        min_codepoint=32,
+        max_codepoint=126
+    ),
+    max_size=500
+)
+
+# Strategy for generating valid Label Studio configs (non-empty strings)
+valid_configs = st.text(
+    alphabet=st.characters(
+        whitelist_categories=("Lu", "Ll", "Nd", "P", "Zs"),
+        min_codepoint=32,
+        max_codepoint=126
+    ),
+    min_size=1,
+    max_size=200
+).filter(lambda x: x.strip() != "")  # Ensure not just whitespace
+
+
+@settings(max_examples=100, deadline=None, suppress_health_check=[HealthCheck.function_scoped_fixture])
+@given(
+    name=valid_project_names,
+    description=project_descriptions,
+    config=valid_configs
+)
+def test_property_1_project_creation_adds_to_list(
+    name: str,
+    description: str,
+    config: str
+):
+    """
+    Feature: annotation-platform, Property 1: Project creation adds to list
+    
+    Property: For any valid project data (non-empty name and config),
+    creating a project should result in the project appearing in the
+    projects list with a unique ID.
+    
+    Validates: Requirements 1.3
+    
+    This property tests that:
+    1. Creating a project with valid data succeeds
+    2. The created project has a unique ID
+    3. The project appears in the list of all projects
+    4. The project data is preserved correctly
+    """
+    # Create test client for this example
+    test_client = get_test_client()
+    
+    # Get initial project count
+    initial_response = test_client.get("/api/projects")
+    assert initial_response.status_code == 200
+    initial_projects = initial_response.json()
+    initial_count = len(initial_projects)
+    
+    # Create project with generated data
+    project_data = {
+        "name": name,
+        "description": description,
+        "config": config
+    }
+    
+    create_response = test_client.post("/api/projects", json=project_data)
+    
+    # Verify creation succeeded
+    assert create_response.status_code == 201, \
+        f"Project creation failed with status {create_response.status_code}"
+    
+    created_project = create_response.json()
+    
+    # Verify project has a unique ID
+    assert "id" in created_project, "Created project should have an ID"
+    assert created_project["id"] is not None, "Project ID should not be None"
+    assert created_project["id"] != "", "Project ID should not be empty"
+    assert created_project["id"].startswith("proj_"), \
+        "Project ID should start with 'proj_' prefix"
+    
+    # Verify project data is preserved
+    assert created_project["name"] == name, \
+        "Created project name should match input"
+    assert created_project["description"] == description, \
+        "Created project description should match input"
+    assert created_project["config"] == config, \
+        "Created project config should match input"
+    
+    # Verify project appears in list
+    list_response = test_client.get("/api/projects")
+    assert list_response.status_code == 200
+    projects_list = list_response.json()
+    
+    # Verify list grew by exactly one
+    assert len(projects_list) == initial_count + 1, \
+        "Project list should grow by exactly one after creation"
+    
+    # Verify the created project is in the list
+    project_ids = [p["id"] for p in projects_list]
+    assert created_project["id"] in project_ids, \
+        "Created project should appear in projects list"
+    
+    # Find the created project in the list and verify data
+    created_in_list = next(
+        (p for p in projects_list if p["id"] == created_project["id"]),
+        None
+    )
+    assert created_in_list is not None, \
+        "Created project should be findable in list"
+    assert created_in_list["name"] == name, \
+        "Project name in list should match input"
+    assert created_in_list["description"] == description, \
+        "Project description in list should match input"
+    assert created_in_list["config"] == config, \
+        "Project config in list should match input"
+    
+    # Verify task_count is initialized to 0
+    assert created_project["task_count"] == 0, \
+        "New project should have task_count of 0"
+    assert created_in_list["task_count"] == 0, \
+        "New project in list should have task_count of 0"
+
+
+@settings(max_examples=50, deadline=None, suppress_health_check=[HealthCheck.function_scoped_fixture])
+@given(
+    name1=valid_project_names,
+    name2=valid_project_names,
+    config=valid_configs
+)
+def test_property_1_multiple_projects_have_unique_ids(
+    name1: str,
+    name2: str,
+    config: str
+):
+    """
+    Feature: annotation-platform, Property 1: Project creation adds to list
+    
+    Property: Creating multiple projects should result in each project
+    having a unique ID, even if they have the same name.
+    
+    Validates: Requirements 1.3
+    
+    This property tests that:
+    1. Multiple projects can be created
+    2. Each project gets a unique ID
+    3. Projects with the same name still get different IDs
+    """
+    # Create test client for this example
+    test_client = get_test_client()
+    
+    # Create first project
+    project1_data = {
+        "name": name1,
+        "description": "First project",
+        "config": config
+    }
+    response1 = test_client.post("/api/projects", json=project1_data)
+    assert response1.status_code == 201
+    project1 = response1.json()
+    
+    # Create second project
+    project2_data = {
+        "name": name2,
+        "description": "Second project",
+        "config": config
+    }
+    response2 = test_client.post("/api/projects", json=project2_data)
+    assert response2.status_code == 201
+    project2 = response2.json()
+    
+    # Verify both projects have IDs
+    assert "id" in project1
+    assert "id" in project2
+    
+    # Verify IDs are unique
+    assert project1["id"] != project2["id"], \
+        "Different projects should have unique IDs"
+    
+    # Verify both projects appear in list
+    list_response = test_client.get("/api/projects")
+    assert list_response.status_code == 200
+    projects_list = list_response.json()
+    
+    project_ids = [p["id"] for p in projects_list]
+    assert project1["id"] in project_ids, \
+        "First project should be in list"
+    assert project2["id"] in project_ids, \
+        "Second project should be in list"
+    
+    # Verify we can retrieve each project individually
+    get_response1 = test_client.get(f"/api/projects/{project1['id']}")
+    assert get_response1.status_code == 200
+    assert get_response1.json()["id"] == project1["id"]
+    
+    get_response2 = test_client.get(f"/api/projects/{project2['id']}")
+    assert get_response2.status_code == 200
+    assert get_response2.json()["id"] == project2["id"]

+ 299 - 0
backend/test/test_task_api.py

@@ -0,0 +1,299 @@
+"""
+Unit tests for Task API endpoints.
+Tests CRUD operations for tasks.
+"""
+import pytest
+import os
+import json
+from fastapi.testclient import TestClient
+
+# Use a test database
+TEST_DB_PATH = "test_task_annotation_platform.db"
+
+
+@pytest.fixture(scope="function", autouse=True)
+def setup_test_db():
+    """Setup test database before each test and cleanup after."""
+    # Set test database path
+    original_db_path = os.environ.get("DATABASE_PATH")
+    os.environ["DATABASE_PATH"] = TEST_DB_PATH
+    
+    # Remove existing test database
+    if os.path.exists(TEST_DB_PATH):
+        os.remove(TEST_DB_PATH)
+    
+    # Import after setting env var
+    from database import init_database
+    init_database()
+    
+    yield
+    
+    # Cleanup
+    if os.path.exists(TEST_DB_PATH):
+        os.remove(TEST_DB_PATH)
+    
+    # Restore original path
+    if original_db_path:
+        os.environ["DATABASE_PATH"] = original_db_path
+    elif "DATABASE_PATH" in os.environ:
+        del os.environ["DATABASE_PATH"]
+
+
+@pytest.fixture(scope="function")
+def test_client():
+    """Create a test client."""
+    from main import app
+    return TestClient(app)
+
+
+@pytest.fixture(scope="function")
+def sample_project(test_client):
+    """Create a sample project for testing."""
+    project_data = {
+        "name": "Test Project",
+        "description": "Test Description",
+        "config": "<View><Image name='img' value='$image'/></View>"
+    }
+    response = test_client.post("/api/projects", json=project_data)
+    return response.json()
+
+
+def test_list_tasks_empty(test_client):
+    """Test listing tasks when database is empty."""
+    response = test_client.get("/api/tasks")
+    assert response.status_code == 200
+    assert response.json() == []
+
+
+def test_create_task(test_client, sample_project):
+    """Test creating a new task."""
+    task_data = {
+        "project_id": sample_project["id"],
+        "name": "Test Task",
+        "data": {"image_url": "https://example.com/image.jpg"},
+        "assigned_to": "user_001"
+    }
+    
+    response = test_client.post("/api/tasks", json=task_data)
+    assert response.status_code == 201
+    
+    data = response.json()
+    assert data["name"] == task_data["name"]
+    assert data["project_id"] == task_data["project_id"]
+    assert data["data"] == task_data["data"]
+    assert data["assigned_to"] == task_data["assigned_to"]
+    assert data["status"] == "pending"
+    assert "id" in data
+    assert data["id"].startswith("task_")
+    assert data["progress"] == 0.0
+    assert "created_at" in data
+
+
+def test_create_task_invalid_project(test_client):
+    """Test creating a task with invalid project_id fails."""
+    task_data = {
+        "project_id": "nonexistent_project",
+        "name": "Test Task",
+        "data": {"image_url": "https://example.com/image.jpg"}
+    }
+    
+    response = test_client.post("/api/tasks", json=task_data)
+    assert response.status_code == 404
+    assert "not found" in response.json()["detail"].lower()
+
+
+def test_get_task(test_client, sample_project):
+    """Test getting a task by ID."""
+    # Create a task first
+    task_data = {
+        "project_id": sample_project["id"],
+        "name": "Test Task",
+        "data": {"image_url": "https://example.com/image.jpg"}
+    }
+    create_response = test_client.post("/api/tasks", json=task_data)
+    task_id = create_response.json()["id"]
+    
+    # Get the task
+    response = test_client.get(f"/api/tasks/{task_id}")
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert data["id"] == task_id
+    assert data["name"] == task_data["name"]
+
+
+def test_get_task_not_found(test_client):
+    """Test getting a non-existent task returns 404."""
+    response = test_client.get("/api/tasks/nonexistent_id")
+    assert response.status_code == 404
+    assert "not found" in response.json()["detail"].lower()
+
+
+def test_update_task(test_client, sample_project):
+    """Test updating a task."""
+    # Create a task first
+    task_data = {
+        "project_id": sample_project["id"],
+        "name": "Original Name",
+        "data": {"image_url": "https://example.com/image.jpg"}
+    }
+    create_response = test_client.post("/api/tasks", json=task_data)
+    task_id = create_response.json()["id"]
+    
+    # Update the task
+    update_data = {
+        "name": "Updated Name",
+        "status": "in_progress"
+    }
+    response = test_client.put(f"/api/tasks/{task_id}", json=update_data)
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert data["name"] == update_data["name"]
+    assert data["status"] == update_data["status"]
+    assert data["data"] == task_data["data"]  # Data unchanged
+
+
+def test_update_task_not_found(test_client):
+    """Test updating a non-existent task returns 404."""
+    update_data = {"name": "Updated Name"}
+    response = test_client.put("/api/tasks/nonexistent_id", json=update_data)
+    assert response.status_code == 404
+
+
+def test_delete_task(test_client, sample_project):
+    """Test deleting a task."""
+    # Create a task first
+    task_data = {
+        "project_id": sample_project["id"],
+        "name": "Test Task",
+        "data": {"image_url": "https://example.com/image.jpg"}
+    }
+    create_response = test_client.post("/api/tasks", json=task_data)
+    task_id = create_response.json()["id"]
+    
+    # Delete the task
+    response = test_client.delete(f"/api/tasks/{task_id}")
+    assert response.status_code == 204
+    
+    # Verify task is deleted
+    get_response = test_client.get(f"/api/tasks/{task_id}")
+    assert get_response.status_code == 404
+
+
+def test_delete_task_not_found(test_client):
+    """Test deleting a non-existent task returns 404."""
+    response = test_client.delete("/api/tasks/nonexistent_id")
+    assert response.status_code == 404
+
+
+def test_list_tasks_after_creation(test_client, sample_project):
+    """Test listing tasks after creating some."""
+    # Create multiple tasks
+    for i in range(3):
+        task_data = {
+            "project_id": sample_project["id"],
+            "name": f"Task {i}",
+            "data": {"image_url": f"https://example.com/image{i}.jpg"}
+        }
+        test_client.post("/api/tasks", json=task_data)
+    
+    # List tasks
+    response = test_client.get("/api/tasks")
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert len(data) == 3
+    assert all("id" in task for task in data)
+    assert all("progress" in task for task in data)
+
+
+def test_list_tasks_filter_by_project(test_client, sample_project):
+    """Test filtering tasks by project_id."""
+    # Create another project
+    project2_data = {
+        "name": "Project 2",
+        "description": "Description 2",
+        "config": "<View></View>"
+    }
+    project2_response = test_client.post("/api/projects", json=project2_data)
+    project2 = project2_response.json()
+    
+    # Create tasks for both projects
+    task1_data = {
+        "project_id": sample_project["id"],
+        "name": "Task 1",
+        "data": {"image_url": "https://example.com/image1.jpg"}
+    }
+    test_client.post("/api/tasks", json=task1_data)
+    
+    task2_data = {
+        "project_id": project2["id"],
+        "name": "Task 2",
+        "data": {"image_url": "https://example.com/image2.jpg"}
+    }
+    test_client.post("/api/tasks", json=task2_data)
+    
+    # Filter by first project
+    response = test_client.get(f"/api/tasks?project_id={sample_project['id']}")
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert len(data) == 1
+    assert data[0]["project_id"] == sample_project["id"]
+
+
+def test_list_tasks_filter_by_status(test_client, sample_project):
+    """Test filtering tasks by status."""
+    # Create tasks with different statuses
+    task1_data = {
+        "project_id": sample_project["id"],
+        "name": "Task 1",
+        "data": {"image_url": "https://example.com/image1.jpg"}
+    }
+    response1 = test_client.post("/api/tasks", json=task1_data)
+    task1_id = response1.json()["id"]
+    
+    task2_data = {
+        "project_id": sample_project["id"],
+        "name": "Task 2",
+        "data": {"image_url": "https://example.com/image2.jpg"}
+    }
+    test_client.post("/api/tasks", json=task2_data)
+    
+    # Update first task status
+    test_client.put(f"/api/tasks/{task1_id}", json={"status": "in_progress"})
+    
+    # Filter by status
+    response = test_client.get("/api/tasks?status=in_progress")
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert len(data) == 1
+    assert data[0]["status"] == "in_progress"
+
+
+def test_get_project_tasks(test_client, sample_project):
+    """Test getting all tasks for a specific project."""
+    # Create tasks for the project
+    for i in range(2):
+        task_data = {
+            "project_id": sample_project["id"],
+            "name": f"Task {i}",
+            "data": {"image_url": f"https://example.com/image{i}.jpg"}
+        }
+        test_client.post("/api/tasks", json=task_data)
+    
+    # Get project tasks using the alternative endpoint
+    response = test_client.get(f"/api/tasks/projects/{sample_project['id']}/tasks")
+    assert response.status_code == 200
+    
+    data = response.json()
+    assert len(data) == 2
+    assert all(task["project_id"] == sample_project["id"] for task in data)
+
+
+def test_get_project_tasks_not_found(test_client):
+    """Test getting tasks for non-existent project returns 404."""
+    response = test_client.get("/api/tasks/projects/nonexistent_id/tasks")
+    assert response.status_code == 404