| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- """
- Task API router.
- Provides CRUD endpoints for task management.
- """
- import uuid
- import json
- from typing import List, Optional
- from fastapi import APIRouter, HTTPException, status, Query, Request
- from database import get_db_connection
- from schemas.task import TaskCreate, TaskUpdate, TaskResponse
- from models import Task
- router = APIRouter(
- prefix="/api/tasks",
- tags=["tasks"]
- )
- def calculate_progress(data_str: str, annotation_count: int) -> float:
- """计算任务进度"""
- try:
- data = json.loads(data_str) if isinstance(data_str, str) else data_str
- items = data.get('items', [])
- if not items:
- return 0.0
- return min(annotation_count / len(items), 1.0)
- except:
- return 0.0
- @router.get("", response_model=List[TaskResponse])
- async def list_tasks(
- request: Request,
- project_id: Optional[str] = Query(None, description="Filter by project ID"),
- status_filter: Optional[str] = Query(None, alias="status", description="Filter by status"),
- assigned_to: Optional[str] = Query(None, description="Filter by assigned user")
- ):
- """
- List all tasks with optional filters.
- Requires authentication.
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Build query with filters
- query = """
- SELECT
- t.id,
- t.project_id,
- t.name,
- t.data,
- t.status,
- t.assigned_to,
- t.created_at,
- COUNT(a.id) as annotation_count
- FROM tasks t
- LEFT JOIN annotations a ON t.id = a.task_id
- WHERE 1=1
- """
- params = []
-
- if project_id:
- query += " AND t.project_id = ?"
- params.append(project_id)
-
- if status_filter:
- query += " AND t.status = ?"
- params.append(status_filter)
-
- if assigned_to:
- query += " AND t.assigned_to = ?"
- params.append(assigned_to)
-
- query += " GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at ORDER BY t.created_at DESC"
-
- cursor.execute(query, tuple(params))
- rows = cursor.fetchall()
-
- tasks = []
- for row in rows:
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
- progress = calculate_progress(row["data"], row["annotation_count"])
-
- tasks.append(TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=row["status"],
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=progress
- ))
-
- return tasks
- @router.post("", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
- async def create_task(request: Request, task: TaskCreate):
- """
- Create a new task.
- Requires authentication.
- """
- task_id = f"task_{uuid.uuid4().hex[:12]}"
- user = request.state.user
- assigned_to = task.assigned_to if task.assigned_to else user["id"]
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Verify project exists
- cursor.execute("SELECT id FROM projects WHERE id = ?", (task.project_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Project with id '{task.project_id}' not found"
- )
-
- data_json = json.dumps(task.data)
-
- cursor.execute("""
- INSERT INTO tasks (id, project_id, name, data, status, assigned_to)
- VALUES (?, ?, ?, ?, 'pending', ?)
- """, (task_id, task.project_id, task.name, data_json, assigned_to))
-
- cursor.execute("""
- SELECT id, project_id, name, data, status, assigned_to, created_at
- FROM tasks WHERE id = ?
- """, (task_id,))
-
- row = cursor.fetchone()
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
-
- return TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=row["status"],
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=0.0
- )
- @router.get("/{task_id}", response_model=TaskResponse)
- async def get_task(request: Request, task_id: str):
- """
- Get task by ID.
- Requires authentication.
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- cursor.execute("""
- SELECT
- t.id,
- t.project_id,
- t.name,
- t.data,
- t.status,
- t.assigned_to,
- t.created_at,
- COUNT(a.id) as annotation_count
- FROM tasks t
- LEFT JOIN annotations a ON t.id = a.task_id
- WHERE t.id = ?
- GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at
- """, (task_id,))
-
- row = cursor.fetchone()
-
- if not row:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Task with id '{task_id}' not found"
- )
-
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
- progress = calculate_progress(row["data"], row["annotation_count"])
-
- return TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=row["status"],
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=progress
- )
- @router.put("/{task_id}", response_model=TaskResponse)
- async def update_task(request: Request, task_id: str, task: TaskUpdate):
- """
- Update an existing task.
- Requires authentication.
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Task with id '{task_id}' not found"
- )
-
- update_fields = []
- update_values = []
-
- if task.name is not None:
- update_fields.append("name = ?")
- update_values.append(task.name)
-
- if task.data is not None:
- update_fields.append("data = ?")
- update_values.append(json.dumps(task.data))
-
- if task.status is not None:
- update_fields.append("status = ?")
- update_values.append(task.status)
-
- if task.assigned_to is not None:
- update_fields.append("assigned_to = ?")
- update_values.append(task.assigned_to)
-
- if update_fields:
- update_values.append(task_id)
- cursor.execute(f"""
- UPDATE tasks SET {', '.join(update_fields)} WHERE id = ?
- """, tuple(update_values))
-
- cursor.execute("""
- SELECT
- t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at,
- COUNT(a.id) as annotation_count
- FROM tasks t
- LEFT JOIN annotations a ON t.id = a.task_id
- WHERE t.id = ?
- GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at
- """, (task_id,))
-
- row = cursor.fetchone()
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
- progress = calculate_progress(row["data"], row["annotation_count"])
-
- return TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=row["status"],
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=progress
- )
- @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
- async def delete_task(request: Request, task_id: str):
- """
- Delete a task and all associated annotations.
- Requires authentication and admin role.
- """
- user = request.state.user
- if user["role"] != "admin":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="只有管理员可以删除任务"
- )
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Task with id '{task_id}' not found"
- )
-
- cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
- return None
- @router.get("/projects/{project_id}/tasks", response_model=List[TaskResponse])
- async def get_project_tasks(request: Request, project_id: str):
- """
- Get all tasks for a specific project.
- Requires authentication.
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Project with id '{project_id}' not found"
- )
-
- cursor.execute("""
- SELECT
- t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at,
- COUNT(a.id) as annotation_count
- FROM tasks t
- LEFT JOIN annotations a ON t.id = a.task_id
- WHERE t.project_id = ?
- GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at
- ORDER BY t.created_at DESC
- """, (project_id,))
-
- rows = cursor.fetchall()
-
- tasks = []
- for row in rows:
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
- progress = calculate_progress(row["data"], row["annotation_count"])
-
- tasks.append(TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=row["status"],
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=progress
- ))
-
- return tasks
|