| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- """
- Assignment Service for task distribution.
- Handles task assignment preview and dispatch to annotators.
- """
- from datetime import datetime
- from typing import List, Dict, Any, Optional
- from database import get_db_connection
- from schemas.project import ProjectStatus
- from schemas.task import (
- DispatchRequest,
- AssignmentPreviewRequest,
- AnnotatorAssignment,
- AssignmentPreviewResponse,
- DispatchResponse,
- )
- class AssignmentService:
- """Service for managing task assignments."""
-
- def get_annotator_workload(self, annotator_ids: Optional[List[str]] = None) -> Dict[str, Dict[str, Any]]:
- """
- Get current workload for annotators.
-
- Args:
- annotator_ids: Optional list of annotator IDs to filter
-
- Returns:
- Dictionary mapping annotator_id to workload info
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Get all annotators or filter by IDs
- if annotator_ids:
- placeholders = ",".join(["?" for _ in annotator_ids])
- cursor.execute(f"""
- SELECT id, username, email
- FROM users
- WHERE role = 'annotator' AND id IN ({placeholders})
- """, annotator_ids)
- else:
- cursor.execute("""
- SELECT id, username, email
- FROM users
- WHERE role = 'annotator'
- """)
-
- annotators = cursor.fetchall()
-
- workload = {}
- for annotator in annotators:
- annotator_id = annotator["id"]
-
- # Get task counts for this annotator
- cursor.execute("""
- SELECT
- COUNT(*) as total_assigned,
- SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
- SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) as in_progress,
- SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending
- FROM tasks
- WHERE assigned_to = ?
- """, (annotator_id,))
-
- counts = cursor.fetchone()
-
- workload[annotator_id] = {
- "id": annotator_id,
- "username": annotator["username"],
- "email": annotator["email"],
- "total_assigned": counts["total_assigned"] or 0,
- "completed": counts["completed"] or 0,
- "in_progress": counts["in_progress"] or 0,
- "pending": counts["pending"] or 0,
- }
-
- return workload
-
- def preview_assignment(
- self,
- project_id: str,
- annotator_ids: List[str]
- ) -> AssignmentPreviewResponse:
- """
- Preview task assignment distribution.
-
- Args:
- project_id: Project ID
- annotator_ids: List of annotator IDs to assign tasks to
-
- Returns:
- Preview of how tasks would be distributed
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Check project exists and get status
- cursor.execute("""
- SELECT id, name, status FROM projects WHERE id = ?
- """, (project_id,))
- project = cursor.fetchone()
-
- if not project:
- raise ValueError(f"项目 '{project_id}' 不存在")
-
- # Get total and unassigned tasks count
- cursor.execute("""
- SELECT
- COUNT(*) as total,
- SUM(CASE WHEN assigned_to IS NULL THEN 1 ELSE 0 END) as unassigned
- FROM tasks
- WHERE project_id = ?
- """, (project_id,))
- counts = cursor.fetchone()
- total_count = counts["total"] or 0
- unassigned_count = counts["unassigned"] or 0
-
- if unassigned_count == 0:
- raise ValueError("没有待分配的任务")
-
- if not annotator_ids:
- raise ValueError("请选择至少一个标注人员")
-
- # Validate annotators exist
- placeholders = ",".join(["?" for _ in annotator_ids])
- cursor.execute(f"""
- SELECT id, username FROM users
- WHERE id IN ({placeholders}) AND role = 'annotator'
- """, annotator_ids)
- valid_annotators = cursor.fetchall()
-
- if len(valid_annotators) != len(annotator_ids):
- raise ValueError("部分标注人员ID无效")
-
- # Calculate distribution (round-robin style, equal distribution)
- num_annotators = len(annotator_ids)
- base_count = unassigned_count // num_annotators
- remainder = unassigned_count % num_annotators
-
- # Get current workload
- workload = self.get_annotator_workload(annotator_ids)
-
- assignments = []
- for i, annotator in enumerate(valid_annotators):
- annotator_id = annotator["id"]
- # First 'remainder' annotators get one extra task
- task_count = base_count + (1 if i < remainder else 0)
-
- current_workload = workload.get(annotator_id, {})
- # Current workload = total assigned - completed (active tasks)
- active_tasks = current_workload.get("total_assigned", 0) - current_workload.get("completed", 0)
-
- assignments.append(AnnotatorAssignment(
- user_id=annotator_id,
- username=annotator["username"],
- task_count=task_count,
- percentage=round(task_count / unassigned_count * 100, 1) if unassigned_count > 0 else 0,
- current_workload=active_tasks,
- ))
-
- return AssignmentPreviewResponse(
- project_id=project_id,
- total_tasks=total_count,
- unassigned_tasks=unassigned_count,
- assignments=assignments,
- )
-
- def dispatch_tasks(
- self,
- project_id: str,
- annotator_ids: List[str],
- admin_id: str
- ) -> DispatchResponse:
- """
- Dispatch tasks to annotators.
-
- Args:
- project_id: Project ID
- annotator_ids: List of annotator IDs to assign tasks to
- admin_id: ID of admin performing the dispatch
-
- Returns:
- Dispatch result with assignment details
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Check project exists and status
- cursor.execute("""
- SELECT id, name, status FROM projects WHERE id = ?
- """, (project_id,))
- project = cursor.fetchone()
-
- if not project:
- raise ValueError(f"项目 '{project_id}' 不存在")
-
- current_status = ProjectStatus(project["status"]) if project["status"] else ProjectStatus.DRAFT
-
- # Only allow dispatch in ready status
- if current_status != ProjectStatus.READY:
- raise ValueError(f"只能在 ready 状态下分发任务,当前状态: {current_status.value}")
-
- # Get unassigned task IDs
- cursor.execute("""
- SELECT id FROM tasks
- WHERE project_id = ? AND assigned_to IS NULL
- ORDER BY id
- """, (project_id,))
- unassigned_tasks = [row["id"] for row in cursor.fetchall()]
-
- if not unassigned_tasks:
- raise ValueError("没有待分配的任务")
-
- if not annotator_ids:
- raise ValueError("请选择至少一个标注人员")
-
- # Validate annotators
- placeholders = ",".join(["?" for _ in annotator_ids])
- cursor.execute(f"""
- SELECT id, username FROM users
- WHERE id IN ({placeholders}) AND role = 'annotator'
- """, annotator_ids)
- valid_annotators = {row["id"]: row["username"] for row in cursor.fetchall()}
-
- if len(valid_annotators) != len(annotator_ids):
- raise ValueError("部分标注人员ID无效")
-
- # Distribute tasks
- num_annotators = len(annotator_ids)
- assignments_result = {aid: {"count": 0, "name": valid_annotators[aid]} for aid in annotator_ids}
-
- for i, task_id in enumerate(unassigned_tasks):
- annotator_id = annotator_ids[i % num_annotators]
-
- # Update task assignment
- cursor.execute("""
- UPDATE tasks
- SET assigned_to = ?, status = 'pending'
- WHERE id = ?
- """, (annotator_id, task_id))
-
- assignments_result[annotator_id]["count"] += 1
-
- # Update project status to in_progress
- cursor.execute("""
- UPDATE projects
- SET status = ?, updated_at = ?
- WHERE id = ?
- """, (ProjectStatus.IN_PROGRESS.value, datetime.now(), project_id))
-
- # Build response
- total_assigned = len(unassigned_tasks)
-
- # Get updated workload
- workload = self.get_annotator_workload(annotator_ids)
-
- assignments = []
- for aid, info in assignments_result.items():
- current_workload = workload.get(aid, {})
- active_tasks = current_workload.get("total_assigned", 0) - current_workload.get("completed", 0)
-
- assignments.append(AnnotatorAssignment(
- user_id=aid,
- username=info["name"],
- task_count=info["count"],
- percentage=round(info["count"] / total_assigned * 100, 1) if total_assigned > 0 else 0,
- current_workload=active_tasks,
- ))
-
- return DispatchResponse(
- project_id=project_id,
- success=True,
- total_assigned=total_assigned,
- assignments=assignments,
- project_status=ProjectStatus.IN_PROGRESS.value,
- )
- # Singleton instance
- assignment_service = AssignmentService()
|