| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717 |
- """
- Task API router.
- Provides CRUD endpoints for task management.
- """
- import uuid
- import json
- from typing import List, Optional
- from fastapi import APIRouter, HTTPException, status, Query, Request
- from database import get_db_connection
- from schemas.task import (
- TaskCreate, TaskUpdate, TaskResponse,
- TaskAssignRequest, BatchAssignRequest, BatchAssignResponse,
- TaskAssignmentResponse, MyTasksResponse,
- AssignmentPreviewRequest, AssignmentPreviewResponse,
- DispatchRequest, DispatchResponse,
- )
- from models import Task
- from datetime import datetime
- from services.assignment_service import assignment_service
- router = APIRouter(
- prefix="/api/tasks",
- tags=["tasks"]
- )
- def calculate_progress(data_str: str, annotation_count: int) -> float:
- """计算任务进度"""
- try:
- data = json.loads(data_str) if isinstance(data_str, str) else data_str
- items = data.get('items', [])
- if not items:
- return 0.0
- return min(annotation_count / len(items), 1.0)
- except:
- return 0.0
- @router.get("", response_model=List[TaskResponse])
- async def list_tasks(
- request: Request,
- project_id: Optional[str] = Query(None, description="Filter by project ID"),
- status_filter: Optional[str] = Query(None, alias="status", description="Filter by status"),
- assigned_to: Optional[str] = Query(None, description="Filter by assigned user")
- ):
- """
- List tasks with optional filters.
-
- For admin users: Returns all tasks matching the filters.
- For annotator users: Returns only tasks assigned to them (ignores assigned_to filter).
-
- Requires authentication.
- """
- user = request.state.user
- user_id = user["id"]
- user_role = user["role"]
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Build query with filters
- query = """
- SELECT
- t.id,
- t.project_id,
- t.name,
- t.data,
- t.status,
- t.assigned_to,
- t.created_at,
- COUNT(a.id) as annotation_count
- FROM tasks t
- LEFT JOIN annotations a ON t.id = a.task_id
- WHERE 1=1
- """
- params = []
-
- if project_id:
- query += " AND t.project_id = ?"
- params.append(project_id)
-
- if status_filter:
- query += " AND t.status = ?"
- params.append(status_filter)
-
- # 标注员只能看到分配给自己的任务
- if user_role != "admin":
- query += " AND t.assigned_to = ?"
- params.append(user_id)
- elif assigned_to:
- # 管理员可以按 assigned_to 过滤
- query += " AND t.assigned_to = ?"
- params.append(assigned_to)
-
- query += " GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at ORDER BY t.created_at DESC"
-
- cursor.execute(query, tuple(params))
- rows = cursor.fetchall()
-
- tasks = []
- for row in rows:
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
- progress = calculate_progress(row["data"], row["annotation_count"])
-
- tasks.append(TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=row["status"],
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=progress
- ))
-
- return tasks
- @router.post("", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
- async def create_task(request: Request, task: TaskCreate):
- """
- Create a new task.
- Requires authentication.
- """
- task_id = f"task_{uuid.uuid4().hex[:12]}"
- user = request.state.user
- assigned_to = task.assigned_to if task.assigned_to else user["id"]
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Verify project exists
- cursor.execute("SELECT id FROM projects WHERE id = ?", (task.project_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Project with id '{task.project_id}' not found"
- )
-
- data_json = json.dumps(task.data)
-
- cursor.execute("""
- INSERT INTO tasks (id, project_id, name, data, status, assigned_to)
- VALUES (?, ?, ?, ?, 'pending', ?)
- """, (task_id, task.project_id, task.name, data_json, assigned_to))
-
- cursor.execute("""
- SELECT id, project_id, name, data, status, assigned_to, created_at
- FROM tasks WHERE id = ?
- """, (task_id,))
-
- row = cursor.fetchone()
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
-
- return TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=row["status"],
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=0.0
- )
- @router.get("/my-tasks", response_model=MyTasksResponse)
- async def get_my_tasks(
- request: Request,
- project_id: Optional[str] = Query(None, description="Filter by project ID"),
- status_filter: Optional[str] = Query(None, alias="status", description="Filter by status")
- ):
- """
- Get tasks assigned to the current user.
- Requires authentication.
-
- 标注人员只能看到分配给自己的任务。
- """
- user = request.state.user
- user_id = user["id"]
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # 构建查询条件
- where_clauses = ["t.assigned_to = ?"]
- params = [user_id]
-
- if project_id:
- where_clauses.append("t.project_id = ?")
- params.append(project_id)
-
- if status_filter:
- where_clauses.append("t.status = ?")
- params.append(status_filter)
-
- where_sql = " AND ".join(where_clauses)
-
- # 查询任务列表
- query = f"""
- SELECT
- t.id,
- t.project_id,
- t.name,
- t.data,
- t.status,
- t.assigned_to,
- t.created_at,
- COUNT(a.id) as annotation_count
- FROM tasks t
- LEFT JOIN annotations a ON t.id = a.task_id
- WHERE {where_sql}
- GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at
- ORDER BY t.created_at DESC
- """
-
- cursor.execute(query, tuple(params))
- rows = cursor.fetchall()
-
- tasks = []
- completed = 0
- in_progress = 0
- pending = 0
-
- for row in rows:
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
- progress = calculate_progress(row["data"], row["annotation_count"])
-
- task_status = row["status"]
- if task_status == "completed":
- completed += 1
- elif task_status == "in_progress":
- in_progress += 1
- else:
- pending += 1
-
- tasks.append(TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=task_status,
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=progress
- ))
-
- return MyTasksResponse(
- tasks=tasks,
- total=len(tasks),
- completed=completed,
- in_progress=in_progress,
- pending=pending
- )
- @router.post("/batch-assign", response_model=BatchAssignResponse)
- async def batch_assign_tasks(request: Request, assign_data: BatchAssignRequest):
- """
- Batch assign multiple tasks to multiple users.
- Requires authentication and admin role.
-
- 支持两种分配模式:
- - round_robin: 轮询分配,按顺序将任务分配给用户
- - equal: 平均分配,尽量使每个用户分配的任务数量相等
- """
- user = request.state.user
-
- # 只有管理员可以分配任务
- if user["role"] != "admin":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="只有管理员可以分配任务"
- )
-
- if assign_data.mode not in ["round_robin", "equal"]:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="分配模式必须是 'round_robin' 或 'equal'"
- )
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # 验证所有用户存在
- valid_user_ids = []
- for user_id in assign_data.user_ids:
- cursor.execute("SELECT id FROM users WHERE id = ?", (user_id,))
- if cursor.fetchone():
- valid_user_ids.append(user_id)
-
- if not valid_user_ids:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="没有有效的用户可以分配任务"
- )
-
- # 验证所有任务存在
- valid_task_ids = []
- for task_id in assign_data.task_ids:
- cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,))
- if cursor.fetchone():
- valid_task_ids.append(task_id)
-
- if not valid_task_ids:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="没有有效的任务可以分配"
- )
-
- assignments = []
- errors = []
- assigned_at = datetime.now()
-
- if assign_data.mode == "round_robin":
- # 轮询分配
- for i, task_id in enumerate(valid_task_ids):
- user_index = i % len(valid_user_ids)
- target_user_id = valid_user_ids[user_index]
-
- try:
- cursor.execute("""
- UPDATE tasks SET assigned_to = ? WHERE id = ?
- """, (target_user_id, task_id))
-
- assignments.append(TaskAssignmentResponse(
- task_id=task_id,
- assigned_to=target_user_id,
- assigned_by=user["id"],
- assigned_at=assigned_at
- ))
- except Exception as e:
- errors.append({
- "task_id": task_id,
- "error": str(e)
- })
-
- else: # equal 模式
- # 平均分配:计算每个用户应该分配的任务数
- num_tasks = len(valid_task_ids)
- num_users = len(valid_user_ids)
- base_count = num_tasks // num_users
- extra_count = num_tasks % num_users
-
- task_index = 0
- for user_index, target_user_id in enumerate(valid_user_ids):
- # 前 extra_count 个用户多分配一个任务
- count = base_count + (1 if user_index < extra_count else 0)
-
- for _ in range(count):
- if task_index >= len(valid_task_ids):
- break
-
- task_id = valid_task_ids[task_index]
- task_index += 1
-
- try:
- cursor.execute("""
- UPDATE tasks SET assigned_to = ? WHERE id = ?
- """, (target_user_id, task_id))
-
- assignments.append(TaskAssignmentResponse(
- task_id=task_id,
- assigned_to=target_user_id,
- assigned_by=user["id"],
- assigned_at=assigned_at
- ))
- except Exception as e:
- errors.append({
- "task_id": task_id,
- "error": str(e)
- })
-
- return BatchAssignResponse(
- success_count=len(assignments),
- failed_count=len(errors),
- assignments=assignments,
- errors=errors
- )
- @router.get("/{task_id}", response_model=TaskResponse)
- async def get_task(request: Request, task_id: str):
- """
- Get task by ID.
- Requires authentication.
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- cursor.execute("""
- SELECT
- t.id,
- t.project_id,
- t.name,
- t.data,
- t.status,
- t.assigned_to,
- t.created_at,
- COUNT(a.id) as annotation_count
- FROM tasks t
- LEFT JOIN annotations a ON t.id = a.task_id
- WHERE t.id = ?
- GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at
- """, (task_id,))
-
- row = cursor.fetchone()
-
- if not row:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Task with id '{task_id}' not found"
- )
-
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
- progress = calculate_progress(row["data"], row["annotation_count"])
-
- return TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=row["status"],
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=progress
- )
- @router.put("/{task_id}", response_model=TaskResponse)
- async def update_task(request: Request, task_id: str, task: TaskUpdate):
- """
- Update an existing task.
- Requires authentication.
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Task with id '{task_id}' not found"
- )
-
- update_fields = []
- update_values = []
-
- if task.name is not None:
- update_fields.append("name = ?")
- update_values.append(task.name)
-
- if task.data is not None:
- update_fields.append("data = ?")
- update_values.append(json.dumps(task.data))
-
- if task.status is not None:
- update_fields.append("status = ?")
- update_values.append(task.status)
-
- if task.assigned_to is not None:
- update_fields.append("assigned_to = ?")
- update_values.append(task.assigned_to)
-
- if update_fields:
- update_values.append(task_id)
- cursor.execute(f"""
- UPDATE tasks SET {', '.join(update_fields)} WHERE id = ?
- """, tuple(update_values))
-
- cursor.execute("""
- SELECT
- t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at,
- COUNT(a.id) as annotation_count
- FROM tasks t
- LEFT JOIN annotations a ON t.id = a.task_id
- WHERE t.id = ?
- GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at
- """, (task_id,))
-
- row = cursor.fetchone()
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
- progress = calculate_progress(row["data"], row["annotation_count"])
-
- return TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=row["status"],
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=progress
- )
- @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
- async def delete_task(request: Request, task_id: str):
- """
- Delete a task and all associated annotations.
- Requires authentication and admin role.
- """
- user = request.state.user
- if user["role"] != "admin":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="只有管理员可以删除任务"
- )
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Task with id '{task_id}' not found"
- )
-
- cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
- return None
- @router.get("/projects/{project_id}/tasks", response_model=List[TaskResponse])
- async def get_project_tasks(request: Request, project_id: str):
- """
- Get tasks for a specific project.
-
- For admin users: Returns all tasks in the project.
- For annotator users: Returns only tasks assigned to them.
-
- Requires authentication.
- """
- user = request.state.user
- user_id = user["id"]
- user_role = user["role"]
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Project with id '{project_id}' not found"
- )
-
- if user_role == "admin":
- # 管理员:返回项目的所有任务
- cursor.execute("""
- SELECT
- t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at,
- COUNT(a.id) as annotation_count
- FROM tasks t
- LEFT JOIN annotations a ON t.id = a.task_id
- WHERE t.project_id = ?
- GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at
- ORDER BY t.created_at DESC
- """, (project_id,))
- else:
- # 标注员:只返回分配给自己的任务
- cursor.execute("""
- SELECT
- t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at,
- COUNT(a.id) as annotation_count
- FROM tasks t
- LEFT JOIN annotations a ON t.id = a.task_id
- WHERE t.project_id = ? AND t.assigned_to = ?
- GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at
- ORDER BY t.created_at DESC
- """, (project_id, user_id))
-
- rows = cursor.fetchall()
-
- tasks = []
- for row in rows:
- data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
- progress = calculate_progress(row["data"], row["annotation_count"])
-
- tasks.append(TaskResponse(
- id=row["id"],
- project_id=row["project_id"],
- name=row["name"],
- data=data,
- status=row["status"],
- assigned_to=row["assigned_to"],
- created_at=row["created_at"],
- progress=progress
- ))
-
- return tasks
- @router.put("/{task_id}/assign", response_model=TaskAssignmentResponse)
- async def assign_task(request: Request, task_id: str, assign_data: TaskAssignRequest):
- """
- Assign a task to a specific user.
- Requires authentication and admin role.
-
- 将任务分配给指定用户,记录分配时间和分配人。
- """
- user = request.state.user
-
- # 只有管理员可以分配任务
- if user["role"] != "admin":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="只有管理员可以分配任务"
- )
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # 验证任务存在
- cursor.execute("SELECT id, assigned_to FROM tasks WHERE id = ?", (task_id,))
- task_row = cursor.fetchone()
- if not task_row:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"任务 '{task_id}' 不存在"
- )
-
- # 验证用户存在
- cursor.execute("SELECT id FROM users WHERE id = ?", (assign_data.user_id,))
- if not cursor.fetchone():
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"用户 '{assign_data.user_id}' 不存在"
- )
-
- # 更新任务分配
- cursor.execute("""
- UPDATE tasks SET assigned_to = ? WHERE id = ?
- """, (assign_data.user_id, task_id))
-
- assigned_at = datetime.now()
-
- return TaskAssignmentResponse(
- task_id=task_id,
- assigned_to=assign_data.user_id,
- assigned_by=user["id"],
- assigned_at=assigned_at
- )
- @router.post("/preview-assignment/{project_id}", response_model=AssignmentPreviewResponse)
- async def preview_assignment(
- request: Request,
- project_id: str,
- preview_request: AssignmentPreviewRequest
- ):
- """
- Preview task assignment distribution for a project.
-
- Shows how tasks would be distributed among selected annotators
- without actually performing the assignment.
-
- Requires authentication and admin role.
- """
- user = request.state.user
-
- if user["role"] != "admin":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="只有管理员可以预览任务分配"
- )
-
- try:
- preview = assignment_service.preview_assignment(
- project_id=project_id,
- annotator_ids=preview_request.user_ids
- )
- return preview
- except ValueError as e:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=str(e)
- )
- @router.post("/dispatch/{project_id}", response_model=DispatchResponse)
- async def dispatch_tasks(
- request: Request,
- project_id: str,
- dispatch_request: DispatchRequest
- ):
- """
- Dispatch (assign) all unassigned tasks in a project to selected annotators.
-
- This is a one-click operation that:
- 1. Distributes tasks evenly among selected annotators
- 2. Updates project status to 'in_progress'
-
- Only works when project is in 'ready' status.
-
- Requires authentication and admin role.
- """
- user = request.state.user
-
- if user["role"] != "admin":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="只有管理员可以分发任务"
- )
-
- try:
- result = assignment_service.dispatch_tasks(
- project_id=project_id,
- annotator_ids=dispatch_request.user_ids,
- admin_id=user["id"]
- )
- return result
- except ValueError as e:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=str(e)
- )
|