""" Task API router. Provides CRUD endpoints for task management. """ import uuid import json from typing import List, Optional from fastapi import APIRouter, HTTPException, status, Query, Request from database import get_db_connection from schemas.task import ( TaskCreate, TaskUpdate, TaskResponse, TaskAssignRequest, BatchAssignRequest, BatchAssignResponse, TaskAssignmentResponse, MyTasksResponse, AssignmentPreviewRequest, AssignmentPreviewResponse, DispatchRequest, DispatchResponse, ) from models import Task from datetime import datetime from services.assignment_service import assignment_service router = APIRouter( prefix="/api/tasks", tags=["tasks"] ) def calculate_progress(data_str: str, annotation_count: int) -> float: """计算任务进度""" try: data = json.loads(data_str) if isinstance(data_str, str) else data_str items = data.get('items', []) if not items: return 0.0 return min(annotation_count / len(items), 1.0) except: return 0.0 @router.get("", response_model=List[TaskResponse]) async def list_tasks( request: Request, project_id: Optional[str] = Query(None, description="Filter by project ID"), status_filter: Optional[str] = Query(None, alias="status", description="Filter by status"), assigned_to: Optional[str] = Query(None, description="Filter by assigned user") ): """ List tasks with optional filters. For admin users: Returns all tasks matching the filters. For annotator users: Returns only tasks assigned to them (ignores assigned_to filter). Requires authentication. """ user = request.state.user user_id = user["id"] user_role = user["role"] with get_db_connection() as conn: cursor = conn.cursor() # Build query with filters query = """ SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COUNT(a.id) as annotation_count FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE 1=1 """ params = [] if project_id: query += " AND t.project_id = ?" params.append(project_id) if status_filter: query += " AND t.status = ?" params.append(status_filter) # 标注员只能看到分配给自己的任务 if user_role != "admin": query += " AND t.assigned_to = ?" params.append(user_id) elif assigned_to: # 管理员可以按 assigned_to 过滤 query += " AND t.assigned_to = ?" params.append(assigned_to) query += " GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at ORDER BY t.created_at DESC" cursor.execute(query, tuple(params)) rows = cursor.fetchall() tasks = [] for row in rows: data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] progress = calculate_progress(row["data"], row["annotation_count"]) tasks.append(TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=progress )) return tasks @router.post("", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) async def create_task(request: Request, task: TaskCreate): """ Create a new task. Requires authentication. """ task_id = f"task_{uuid.uuid4().hex[:12]}" user = request.state.user assigned_to = task.assigned_to if task.assigned_to else user["id"] with get_db_connection() as conn: cursor = conn.cursor() # Verify project exists cursor.execute("SELECT id FROM projects WHERE id = ?", (task.project_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{task.project_id}' not found" ) data_json = json.dumps(task.data) cursor.execute(""" INSERT INTO tasks (id, project_id, name, data, status, assigned_to) VALUES (?, ?, ?, ?, 'pending', ?) """, (task_id, task.project_id, task.name, data_json, assigned_to)) cursor.execute(""" SELECT id, project_id, name, data, status, assigned_to, created_at FROM tasks WHERE id = ? """, (task_id,)) row = cursor.fetchone() data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=0.0 ) @router.get("/my-tasks", response_model=MyTasksResponse) async def get_my_tasks( request: Request, project_id: Optional[str] = Query(None, description="Filter by project ID"), status_filter: Optional[str] = Query(None, alias="status", description="Filter by status") ): """ Get tasks assigned to the current user. Requires authentication. 标注人员只能看到分配给自己的任务。 """ user = request.state.user user_id = user["id"] with get_db_connection() as conn: cursor = conn.cursor() # 构建查询条件 where_clauses = ["t.assigned_to = ?"] params = [user_id] if project_id: where_clauses.append("t.project_id = ?") params.append(project_id) if status_filter: where_clauses.append("t.status = ?") params.append(status_filter) where_sql = " AND ".join(where_clauses) # 查询任务列表 query = f""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COUNT(a.id) as annotation_count FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE {where_sql} GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at ORDER BY t.created_at DESC """ cursor.execute(query, tuple(params)) rows = cursor.fetchall() tasks = [] completed = 0 in_progress = 0 pending = 0 for row in rows: data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] progress = calculate_progress(row["data"], row["annotation_count"]) task_status = row["status"] if task_status == "completed": completed += 1 elif task_status == "in_progress": in_progress += 1 else: pending += 1 tasks.append(TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=task_status, assigned_to=row["assigned_to"], created_at=row["created_at"], progress=progress )) return MyTasksResponse( tasks=tasks, total=len(tasks), completed=completed, in_progress=in_progress, pending=pending ) @router.post("/batch-assign", response_model=BatchAssignResponse) async def batch_assign_tasks(request: Request, assign_data: BatchAssignRequest): """ Batch assign multiple tasks to multiple users. Requires authentication and admin role. 支持两种分配模式: - round_robin: 轮询分配,按顺序将任务分配给用户 - equal: 平均分配,尽量使每个用户分配的任务数量相等 """ user = request.state.user # 只有管理员可以分配任务 if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以分配任务" ) if assign_data.mode not in ["round_robin", "equal"]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="分配模式必须是 'round_robin' 或 'equal'" ) with get_db_connection() as conn: cursor = conn.cursor() # 验证所有用户存在 valid_user_ids = [] for user_id in assign_data.user_ids: cursor.execute("SELECT id FROM users WHERE id = ?", (user_id,)) if cursor.fetchone(): valid_user_ids.append(user_id) if not valid_user_ids: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="没有有效的用户可以分配任务" ) # 验证所有任务存在 valid_task_ids = [] for task_id in assign_data.task_ids: cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)) if cursor.fetchone(): valid_task_ids.append(task_id) if not valid_task_ids: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="没有有效的任务可以分配" ) assignments = [] errors = [] assigned_at = datetime.now() if assign_data.mode == "round_robin": # 轮询分配 for i, task_id in enumerate(valid_task_ids): user_index = i % len(valid_user_ids) target_user_id = valid_user_ids[user_index] try: cursor.execute(""" UPDATE tasks SET assigned_to = ? WHERE id = ? """, (target_user_id, task_id)) assignments.append(TaskAssignmentResponse( task_id=task_id, assigned_to=target_user_id, assigned_by=user["id"], assigned_at=assigned_at )) except Exception as e: errors.append({ "task_id": task_id, "error": str(e) }) else: # equal 模式 # 平均分配:计算每个用户应该分配的任务数 num_tasks = len(valid_task_ids) num_users = len(valid_user_ids) base_count = num_tasks // num_users extra_count = num_tasks % num_users task_index = 0 for user_index, target_user_id in enumerate(valid_user_ids): # 前 extra_count 个用户多分配一个任务 count = base_count + (1 if user_index < extra_count else 0) for _ in range(count): if task_index >= len(valid_task_ids): break task_id = valid_task_ids[task_index] task_index += 1 try: cursor.execute(""" UPDATE tasks SET assigned_to = ? WHERE id = ? """, (target_user_id, task_id)) assignments.append(TaskAssignmentResponse( task_id=task_id, assigned_to=target_user_id, assigned_by=user["id"], assigned_at=assigned_at )) except Exception as e: errors.append({ "task_id": task_id, "error": str(e) }) return BatchAssignResponse( success_count=len(assignments), failed_count=len(errors), assignments=assignments, errors=errors ) @router.get("/{task_id}", response_model=TaskResponse) async def get_task(request: Request, task_id: str): """ Get task by ID. Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COUNT(a.id) as annotation_count FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.id = ? GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at """, (task_id,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] progress = calculate_progress(row["data"], row["annotation_count"]) return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=progress ) @router.put("/{task_id}", response_model=TaskResponse) async def update_task(request: Request, task_id: str, task: TaskUpdate): """ Update an existing task. Requires authentication. """ with get_db_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) update_fields = [] update_values = [] if task.name is not None: update_fields.append("name = ?") update_values.append(task.name) if task.data is not None: update_fields.append("data = ?") update_values.append(json.dumps(task.data)) if task.status is not None: update_fields.append("status = ?") update_values.append(task.status) if task.assigned_to is not None: update_fields.append("assigned_to = ?") update_values.append(task.assigned_to) if update_fields: update_values.append(task_id) cursor.execute(f""" UPDATE tasks SET {', '.join(update_fields)} WHERE id = ? """, tuple(update_values)) cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COUNT(a.id) as annotation_count FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.id = ? GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at """, (task_id,)) row = cursor.fetchone() data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] progress = calculate_progress(row["data"], row["annotation_count"]) return TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=progress ) @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_task(request: Request, task_id: str): """ Delete a task and all associated annotations. Requires authentication and admin role. """ user = request.state.user if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以删除任务" ) with get_db_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with id '{task_id}' not found" ) cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) return None @router.get("/projects/{project_id}/tasks", response_model=List[TaskResponse]) async def get_project_tasks(request: Request, project_id: str): """ Get tasks for a specific project. For admin users: Returns all tasks in the project. For annotator users: Returns only tasks assigned to them. Requires authentication. """ user = request.state.user user_id = user["id"] user_role = user["role"] with get_db_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id '{project_id}' not found" ) if user_role == "admin": # 管理员:返回项目的所有任务 cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COUNT(a.id) as annotation_count FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.project_id = ? GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at ORDER BY t.created_at DESC """, (project_id,)) else: # 标注员:只返回分配给自己的任务 cursor.execute(""" SELECT t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at, COUNT(a.id) as annotation_count FROM tasks t LEFT JOIN annotations a ON t.id = a.task_id WHERE t.project_id = ? AND t.assigned_to = ? GROUP BY t.id, t.project_id, t.name, t.data, t.status, t.assigned_to, t.created_at ORDER BY t.created_at DESC """, (project_id, user_id)) rows = cursor.fetchall() tasks = [] for row in rows: data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] progress = calculate_progress(row["data"], row["annotation_count"]) tasks.append(TaskResponse( id=row["id"], project_id=row["project_id"], name=row["name"], data=data, status=row["status"], assigned_to=row["assigned_to"], created_at=row["created_at"], progress=progress )) return tasks @router.put("/{task_id}/assign", response_model=TaskAssignmentResponse) async def assign_task(request: Request, task_id: str, assign_data: TaskAssignRequest): """ Assign a task to a specific user. Requires authentication and admin role. 将任务分配给指定用户,记录分配时间和分配人。 """ user = request.state.user # 只有管理员可以分配任务 if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以分配任务" ) with get_db_connection() as conn: cursor = conn.cursor() # 验证任务存在 cursor.execute("SELECT id, assigned_to FROM tasks WHERE id = ?", (task_id,)) task_row = cursor.fetchone() if not task_row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"任务 '{task_id}' 不存在" ) # 验证用户存在 cursor.execute("SELECT id FROM users WHERE id = ?", (assign_data.user_id,)) if not cursor.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"用户 '{assign_data.user_id}' 不存在" ) # 更新任务分配 cursor.execute(""" UPDATE tasks SET assigned_to = ? WHERE id = ? """, (assign_data.user_id, task_id)) assigned_at = datetime.now() return TaskAssignmentResponse( task_id=task_id, assigned_to=assign_data.user_id, assigned_by=user["id"], assigned_at=assigned_at ) @router.post("/preview-assignment/{project_id}", response_model=AssignmentPreviewResponse) async def preview_assignment( request: Request, project_id: str, preview_request: AssignmentPreviewRequest ): """ Preview task assignment distribution for a project. Shows how tasks would be distributed among selected annotators without actually performing the assignment. Requires authentication and admin role. """ user = request.state.user if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以预览任务分配" ) try: preview = assignment_service.preview_assignment( project_id=project_id, annotator_ids=preview_request.user_ids ) return preview except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) @router.post("/dispatch/{project_id}", response_model=DispatchResponse) async def dispatch_tasks( request: Request, project_id: str, dispatch_request: DispatchRequest ): """ Dispatch (assign) all unassigned tasks in a project to selected annotators. This is a one-click operation that: 1. Distributes tasks evenly among selected annotators 2. Updates project status to 'in_progress' Only works when project is in 'ready' status. Requires authentication and admin role. """ user = request.state.user if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以分发任务" ) try: result = assignment_service.dispatch_tasks( project_id=project_id, annotator_ids=dispatch_request.user_ids, admin_id=user["id"] ) return result except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) )