""" Task Assignment API tests. Tests for task assignment endpoints. """ import pytest import uuid import json from fastapi.testclient import TestClient from main import app from database import get_db_connection, init_database from services.jwt_service import JWTService import bcrypt # 测试客户端 client = TestClient(app) @pytest.fixture(scope="module") def setup_database(): """初始化测试数据库""" init_database() yield @pytest.fixture def admin_user(setup_database): """创建管理员用户""" admin_id = f"admin_{uuid.uuid4().hex[:8]}" password_hash = bcrypt.hashpw("admin123".encode(), bcrypt.gensalt()).decode() with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO users (id, username, email, password_hash, role) VALUES (?, ?, ?, ?, 'admin') """, (admin_id, f"test_admin_{admin_id}", f"admin_{admin_id}@test.com", password_hash)) user_data = { "id": admin_id, "username": f"test_admin_{admin_id}", "email": f"admin_{admin_id}@test.com", "role": "admin" } token = JWTService.create_access_token(user_data) yield {"token": token, "user_id": admin_id, "user_data": user_data} # 清理 with get_db_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM users WHERE id = ?", (admin_id,)) @pytest.fixture def annotator_users(setup_database): """创建多个标注人员用户""" annotators = [] for i in range(3): annotator_id = f"annotator_{uuid.uuid4().hex[:8]}" password_hash = bcrypt.hashpw("annotator123".encode(), bcrypt.gensalt()).decode() with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO users (id, username, email, password_hash, role) VALUES (?, ?, ?, ?, 'annotator') """, (annotator_id, f"test_annotator_{annotator_id}", f"annotator_{annotator_id}@test.com", password_hash)) user_data = { "id": annotator_id, "username": f"test_annotator_{annotator_id}", "email": f"annotator_{annotator_id}@test.com", "role": "annotator" } token = JWTService.create_access_token(user_data) annotators.append({"token": token, "user_id": annotator_id, "user_data": user_data}) yield annotators # 清理 with get_db_connection() as conn: cursor = conn.cursor() for annotator in annotators: cursor.execute("DELETE FROM users WHERE id = ?", (annotator["user_id"],)) @pytest.fixture def test_project(setup_database): """创建测试项目""" project_id = f"proj_{uuid.uuid4().hex[:8]}" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO projects (id, name, description, config) VALUES (?, ?, ?, ?) """, (project_id, "Test Project", "Test Description", "")) yield project_id # 清理 with get_db_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM projects WHERE id = ?", (project_id,)) @pytest.fixture def test_tasks(setup_database, test_project): """创建测试任务""" task_ids = [] for i in range(5): task_id = f"task_{uuid.uuid4().hex[:8]}" data = json.dumps({"items": [{"id": j} for j in range(3)]}) with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO tasks (id, project_id, name, data, status) VALUES (?, ?, ?, ?, 'pending') """, (task_id, test_project, f"Test Task {i}", data)) task_ids.append(task_id) yield task_ids # 清理 with get_db_connection() as conn: cursor = conn.cursor() for task_id in task_ids: cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) class TestTaskAssignment: """单个任务分配测试""" def test_assign_task_without_auth(self, setup_database, test_tasks): """未认证时应返回 401""" response = client.put( f"/api/tasks/{test_tasks[0]}/assign", json={"user_id": "some_user"} ) assert response.status_code == 401 def test_assign_task_as_annotator(self, annotator_users, test_tasks): """标注人员分配任务应返回 403""" headers = {"Authorization": f"Bearer {annotator_users[0]['token']}"} response = client.put( f"/api/tasks/{test_tasks[0]}/assign", json={"user_id": annotator_users[1]["user_id"]}, headers=headers ) assert response.status_code == 403 def test_assign_task_as_admin(self, admin_user, annotator_users, test_tasks): """管理员可以分配任务""" headers = {"Authorization": f"Bearer {admin_user['token']}"} response = client.put( f"/api/tasks/{test_tasks[0]}/assign", json={"user_id": annotator_users[0]["user_id"]}, headers=headers ) assert response.status_code == 200 data = response.json() assert data["task_id"] == test_tasks[0] assert data["assigned_to"] == annotator_users[0]["user_id"] assert data["assigned_by"] == admin_user["user_id"] def test_assign_nonexistent_task(self, admin_user, annotator_users): """分配不存在的任务应返回 404""" headers = {"Authorization": f"Bearer {admin_user['token']}"} response = client.put( "/api/tasks/nonexistent_task/assign", json={"user_id": annotator_users[0]["user_id"]}, headers=headers ) assert response.status_code == 404 def test_assign_to_nonexistent_user(self, admin_user, test_tasks): """分配给不存在的用户应返回 404""" headers = {"Authorization": f"Bearer {admin_user['token']}"} response = client.put( f"/api/tasks/{test_tasks[0]}/assign", json={"user_id": "nonexistent_user"}, headers=headers ) assert response.status_code == 404 class TestBatchAssignment: """批量任务分配测试""" def test_batch_assign_without_auth(self, setup_database, test_tasks, annotator_users): """未认证时应返回 401""" response = client.post( "/api/tasks/batch-assign", json={ "task_ids": test_tasks[:2], "user_ids": [annotator_users[0]["user_id"]], "mode": "round_robin" } ) assert response.status_code == 401 def test_batch_assign_round_robin(self, admin_user, annotator_users, test_tasks): """轮询分配模式测试""" headers = {"Authorization": f"Bearer {admin_user['token']}"} user_ids = [a["user_id"] for a in annotator_users[:2]] response = client.post( "/api/tasks/batch-assign", json={ "task_ids": test_tasks, "user_ids": user_ids, "mode": "round_robin" }, headers=headers ) assert response.status_code == 200 data = response.json() assert data["success_count"] == len(test_tasks) assert data["failed_count"] == 0 # 验证轮询分配 assignments = data["assignments"] for i, assignment in enumerate(assignments): expected_user = user_ids[i % len(user_ids)] assert assignment["assigned_to"] == expected_user def test_batch_assign_equal(self, admin_user, annotator_users, test_tasks): """平均分配模式测试""" headers = {"Authorization": f"Bearer {admin_user['token']}"} user_ids = [a["user_id"] for a in annotator_users[:2]] response = client.post( "/api/tasks/batch-assign", json={ "task_ids": test_tasks, "user_ids": user_ids, "mode": "equal" }, headers=headers ) assert response.status_code == 200 data = response.json() assert data["success_count"] == len(test_tasks) # 验证平均分配:每个用户分配的任务数差异不超过 1 user_task_counts = {} for assignment in data["assignments"]: user_id = assignment["assigned_to"] user_task_counts[user_id] = user_task_counts.get(user_id, 0) + 1 counts = list(user_task_counts.values()) assert max(counts) - min(counts) <= 1 def test_batch_assign_invalid_mode(self, admin_user, annotator_users, test_tasks): """无效分配模式应返回 400""" headers = {"Authorization": f"Bearer {admin_user['token']}"} response = client.post( "/api/tasks/batch-assign", json={ "task_ids": test_tasks[:2], "user_ids": [annotator_users[0]["user_id"]], "mode": "invalid_mode" }, headers=headers ) assert response.status_code == 400 class TestMyTasks: """当前用户任务列表测试""" def test_my_tasks_without_auth(self, setup_database): """未认证时应返回 401""" response = client.get("/api/tasks/my-tasks") assert response.status_code == 401 def test_my_tasks_empty(self, annotator_users): """没有分配任务时返回空列表""" headers = {"Authorization": f"Bearer {annotator_users[0]['token']}"} response = client.get("/api/tasks/my-tasks", headers=headers) assert response.status_code == 200 data = response.json() assert data["total"] == 0 assert data["tasks"] == [] def test_my_tasks_with_assignments(self, admin_user, annotator_users, test_tasks): """分配任务后可以看到自己的任务""" # 先分配任务 admin_headers = {"Authorization": f"Bearer {admin_user['token']}"} target_user = annotator_users[0] # 分配 2 个任务给第一个标注人员 for task_id in test_tasks[:2]: client.put( f"/api/tasks/{task_id}/assign", json={"user_id": target_user["user_id"]}, headers=admin_headers ) # 查询自己的任务 user_headers = {"Authorization": f"Bearer {target_user['token']}"} response = client.get("/api/tasks/my-tasks", headers=user_headers) assert response.status_code == 200 data = response.json() assert data["total"] == 2 assert len(data["tasks"]) == 2 # 验证所有任务都是分配给当前用户的 for task in data["tasks"]: assert task["assigned_to"] == target_user["user_id"] def test_my_tasks_only_shows_own_tasks(self, admin_user, annotator_users, test_tasks): """标注人员只能看到分配给自己的任务""" admin_headers = {"Authorization": f"Bearer {admin_user['token']}"} # 分配不同任务给不同用户 client.put( f"/api/tasks/{test_tasks[0]}/assign", json={"user_id": annotator_users[0]["user_id"]}, headers=admin_headers ) client.put( f"/api/tasks/{test_tasks[1]}/assign", json={"user_id": annotator_users[1]["user_id"]}, headers=admin_headers ) # 第一个用户只能看到自己的任务 user1_headers = {"Authorization": f"Bearer {annotator_users[0]['token']}"} response1 = client.get("/api/tasks/my-tasks", headers=user1_headers) data1 = response1.json() for task in data1["tasks"]: assert task["assigned_to"] == annotator_users[0]["user_id"] # 第二个用户只能看到自己的任务 user2_headers = {"Authorization": f"Bearer {annotator_users[1]['token']}"} response2 = client.get("/api/tasks/my-tasks", headers=user2_headers) data2 = response2.json() for task in data2["tasks"]: assert task["assigned_to"] == annotator_users[1]["user_id"]