""" Assignment Service for task distribution. Handles task assignment preview and dispatch to annotators. """ from datetime import datetime from typing import List, Dict, Any, Optional from database import get_db_connection from schemas.project import ProjectStatus from schemas.task import ( DispatchRequest, AssignmentPreviewRequest, AnnotatorAssignment, AssignmentPreviewResponse, DispatchResponse, ) class AssignmentService: """Service for managing task assignments.""" def get_annotator_workload(self, annotator_ids: Optional[List[str]] = None) -> Dict[str, Dict[str, Any]]: """ Get current workload for annotators. Args: annotator_ids: Optional list of annotator IDs to filter Returns: Dictionary mapping annotator_id to workload info """ with get_db_connection() as conn: cursor = conn.cursor() # Get all annotators or filter by IDs if annotator_ids: placeholders = ",".join(["?" for _ in annotator_ids]) cursor.execute(f""" SELECT id, username, email FROM users WHERE role = 'annotator' AND id IN ({placeholders}) """, annotator_ids) else: cursor.execute(""" SELECT id, username, email FROM users WHERE role = 'annotator' """) annotators = cursor.fetchall() workload = {} for annotator in annotators: annotator_id = annotator["id"] # Get task counts for this annotator cursor.execute(""" SELECT COUNT(*) as total_assigned, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) as in_progress, SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending FROM tasks WHERE assigned_to = ? """, (annotator_id,)) counts = cursor.fetchone() workload[annotator_id] = { "id": annotator_id, "username": annotator["username"], "email": annotator["email"], "total_assigned": counts["total_assigned"] or 0, "completed": counts["completed"] or 0, "in_progress": counts["in_progress"] or 0, "pending": counts["pending"] or 0, } return workload def preview_assignment( self, project_id: str, annotator_ids: List[str] ) -> AssignmentPreviewResponse: """ Preview task assignment distribution. Args: project_id: Project ID annotator_ids: List of annotator IDs to assign tasks to Returns: Preview of how tasks would be distributed """ with get_db_connection() as conn: cursor = conn.cursor() # Check project exists and get status cursor.execute(""" SELECT id, name, status FROM projects WHERE id = ? """, (project_id,)) project = cursor.fetchone() if not project: raise ValueError(f"项目 '{project_id}' 不存在") # Get total and unassigned tasks count cursor.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN assigned_to IS NULL THEN 1 ELSE 0 END) as unassigned FROM tasks WHERE project_id = ? """, (project_id,)) counts = cursor.fetchone() total_count = counts["total"] or 0 unassigned_count = counts["unassigned"] or 0 if unassigned_count == 0: raise ValueError("没有待分配的任务") if not annotator_ids: raise ValueError("请选择至少一个标注人员") # Validate annotators exist placeholders = ",".join(["?" for _ in annotator_ids]) cursor.execute(f""" SELECT id, username FROM users WHERE id IN ({placeholders}) AND role = 'annotator' """, annotator_ids) valid_annotators = cursor.fetchall() if len(valid_annotators) != len(annotator_ids): raise ValueError("部分标注人员ID无效") # Calculate distribution (round-robin style, equal distribution) num_annotators = len(annotator_ids) base_count = unassigned_count // num_annotators remainder = unassigned_count % num_annotators # Get current workload workload = self.get_annotator_workload(annotator_ids) assignments = [] for i, annotator in enumerate(valid_annotators): annotator_id = annotator["id"] # First 'remainder' annotators get one extra task task_count = base_count + (1 if i < remainder else 0) current_workload = workload.get(annotator_id, {}) # Current workload = total assigned - completed (active tasks) active_tasks = current_workload.get("total_assigned", 0) - current_workload.get("completed", 0) assignments.append(AnnotatorAssignment( user_id=annotator_id, username=annotator["username"], task_count=task_count, percentage=round(task_count / unassigned_count * 100, 1) if unassigned_count > 0 else 0, current_workload=active_tasks, )) return AssignmentPreviewResponse( project_id=project_id, total_tasks=total_count, unassigned_tasks=unassigned_count, assignments=assignments, ) def dispatch_tasks( self, project_id: str, annotator_ids: List[str], admin_id: str ) -> DispatchResponse: """ Dispatch tasks to annotators. Args: project_id: Project ID annotator_ids: List of annotator IDs to assign tasks to admin_id: ID of admin performing the dispatch Returns: Dispatch result with assignment details """ with get_db_connection() as conn: cursor = conn.cursor() # Check project exists and status cursor.execute(""" SELECT id, name, status FROM projects WHERE id = ? """, (project_id,)) project = cursor.fetchone() if not project: raise ValueError(f"项目 '{project_id}' 不存在") current_status = ProjectStatus(project["status"]) if project["status"] else ProjectStatus.DRAFT # Only allow dispatch in ready status if current_status != ProjectStatus.READY: raise ValueError(f"只能在 ready 状态下分发任务,当前状态: {current_status.value}") # Get unassigned task IDs cursor.execute(""" SELECT id FROM tasks WHERE project_id = ? AND assigned_to IS NULL ORDER BY id """, (project_id,)) unassigned_tasks = [row["id"] for row in cursor.fetchall()] if not unassigned_tasks: raise ValueError("没有待分配的任务") if not annotator_ids: raise ValueError("请选择至少一个标注人员") # Validate annotators placeholders = ",".join(["?" for _ in annotator_ids]) cursor.execute(f""" SELECT id, username FROM users WHERE id IN ({placeholders}) AND role = 'annotator' """, annotator_ids) valid_annotators = {row["id"]: row["username"] for row in cursor.fetchall()} if len(valid_annotators) != len(annotator_ids): raise ValueError("部分标注人员ID无效") # Distribute tasks num_annotators = len(annotator_ids) assignments_result = {aid: {"count": 0, "name": valid_annotators[aid]} for aid in annotator_ids} for i, task_id in enumerate(unassigned_tasks): annotator_id = annotator_ids[i % num_annotators] # Update task assignment cursor.execute(""" UPDATE tasks SET assigned_to = ?, status = 'pending' WHERE id = ? """, (annotator_id, task_id)) assignments_result[annotator_id]["count"] += 1 # Update project status to in_progress cursor.execute(""" UPDATE projects SET status = ?, updated_at = ? WHERE id = ? """, (ProjectStatus.IN_PROGRESS.value, datetime.now(), project_id)) # Build response total_assigned = len(unassigned_tasks) # Get updated workload workload = self.get_annotator_workload(annotator_ids) assignments = [] for aid, info in assignments_result.items(): current_workload = workload.get(aid, {}) active_tasks = current_workload.get("total_assigned", 0) - current_workload.get("completed", 0) assignments.append(AnnotatorAssignment( user_id=aid, username=info["name"], task_count=info["count"], percentage=round(info["count"] / total_assigned * 100, 1) if total_assigned > 0 else 0, current_workload=active_tasks, )) return DispatchResponse( project_id=project_id, success=True, total_assigned=total_assigned, assignments=assignments, project_status=ProjectStatus.IN_PROGRESS.value, ) # Singleton instance assignment_service = AssignmentService()